HyMind Lab Example

Please make a copy of this notebook and do not edit in place

See the system diagram for an overview of the HYLODE system components referenced in this notebook.
(You will need to be signed into GitHub to view)

Packages

Available

import pkg_resources
installed_packages = pkg_resources.working_set
installed_packages_list = sorted([f'{i.key}=={i.version}' for i in installed_packages])
# Uncomment to list installed packages
# installed_packages_list

Need more?

For a quick installation, uncomment & run the command below (replace ujson with the package you want)

# !pip install ujson
# import ujson
# ujson.__version__

Packages installed this way will disappear when the container is restarted.

To have the package permanently available, please log a ticket on ZenHub requesting the package to be added to the HyMind Lab.

Some imports

from datetime import datetime, timedelta
import os
from pathlib import Path
from pprint import pprint
import urllib

import arrow
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sqlalchemy import create_engine

from hylib.dt import LONDON_TZ, convert_dt_columns_to_london_tz

Constants

ward = 'T03'

Database Connections

EMAP DB

Access to EMAP is required for multiple components of the system to function properly.
This includes some of the functions that are useful to run in the HyMind Lab

EMAP credentials

EMAP credentials are allocated per user and not stored in the environment variables. You do not want your credentials to leak into the source repository.

One way of safeguarding is to create a file called secret at the top level of the HyMind repository (next to this notebook).
Do this here in Jupyter and not a local copy of the repo.

The first line should be your UDS username and the second line should be your UDS password.

secret has been added to .gitignore and will be excluded from the repository.

Read your username & password into the environment:

os.environ['EMAP_DB_USER'], os.environ['EMAP_DB_PASSWORD'] = Path('secret').read_text().strip().split('\n')
uds_host = os.getenv('EMAP_DB_HOST')
uds_name = os.getenv('EMAP_DB_NAME')
uds_port = os.getenv('EMAP_DB_PORT')
uds_user = os.getenv('EMAP_DB_USER')
uds_passwd = os.getenv('EMAP_DB_PASSWORD')

Create a SQLAlchemy Engine for accessing the UDS:

emapdb_engine = create_engine(f'postgresql://{uds_user}:{uds_passwd}@{uds_host}:{uds_port}/{uds_name}')

HYLODE DB

The hylode database is a containerised instance of Postgres 12 used by the various components to store data for use further down the pipeline.
You can think of it as the medium of data flow for our system.
Unlike the uds, it is private to us.
You don’t need individual credentials, everthing is baked into the environment variables.

There are several schemas, roughly one for each subsystem (see link to system diagram above).

Storing data in and retrieving data from the hylode database happens through the APIs provided by the hyflow, hygear & hycastle modules.
Direct interaction with the database is not an expected part of the HyMind workflow and presented here for interest only.

db_host = os.getenv('HYLODE_DB_HOST')
db_name = os.getenv('HYLODE_DB_NAME')
db_user = os.getenv('HYLODE_DB_USER')
db_passwd = os.getenv('HYLODE_DB_PASSWORD')
db_port = os.getenv('HYLODE_DB_PORT')                                                                                                       
hydb_engine = create_engine(f'postgresql://{db_user}:{db_passwd}@{db_host}:{db_port}/{db_name}')

HyDef

The hydef schema in the hylode database contains static reference data

Locations

beds_df = pd.read_sql(
    """
    select 
        bed.id
        ,bed.code
        ,bed.source_id
        ,bay.code
        ,bay.type
        ,ward.code
        ,ward.name
        ,ward.type
    from
        hydef.beds bed
    inner join hydef.bays bay on bed.bay_id = bay.id
    inner join hydef.wards ward on ward.code = bay.ward_code
    order by ward.code, bay.code, bed.code
    """,
    hydb_engine
)
beds_df

ICU Observation Types Catalogue

A growing list of observation types that we are interested in for the ICU pipeline

icu_obs_types = pd.read_sql('select * from hydef.icu_observation_types', hydb_engine)
icu_obs_types

HyCastle

The HyCastle component is responsible for serving features for both training and prediction.

HyCastle provides a high level interface to getting clean, transformed features & labels from the pipeline.

To dig deeper, the hyflow & hygear packages are also available for use within the HyMind Lab.
Additionally, all the tables in the hydef, hyflow & hygear schemas in the hylode database can be accessed directly using the database connection defined above, just like the star schema on the uds 🙂.

from hycastle.icu_store import SITREP_FEATURES
from hycastle.icu_store.retro import retro_dataset
from hycastle.icu_store.live import live_dataset, emap_snapshot # <-- contains PII

Retrospective Data

training_df = retro_dataset(ward)
training_df.shape
training_df.head()
training_df.columns
training_df.episode_slice_id.duplicated().any()
training_df.isnull().any()

Live Data

These functions return personally identifiable information

HyLode Live Episode Slices

prediction_df = live_dataset(ward)
prediction_df.shape
prediction_df

EMAP Live Census Snapshot

emap_df = emap_snapshot(ward)
emap_df.head()

Filter

Limit episode slices used for prediction to admissions that are in the EMAP census

prediction_df.csn.isin(emap_df.csn)
prediction_df = prediction_df[prediction_df.csn.isin(emap_df.csn)]
prediction_df

HyFlow

The HyFlow component is responsible for ingesting raw data from upstream data sources such as EMAP & Caboodle.

The idea is for HyMind Lab users to operate at the level of HyCastle and not to have to worry about the raw data ingress during the modelling process.
Access to source data is still required for exploration and feature engineer which can be done through the hyflow package which is available for use from within the HyMind Lab.
Additionally, all the tables in the hyflow schema in the hylode database can be accessed directly.

from hyflow.fetch.hydef import beds_from_hydef, icu_observation_types_from_hydef
from hyflow.fetch.icu_episode_slices import icu_episode_slices_from_emap, icu_episode_slices_from_hyflow
from hyflow.fetch.icu_patients import icu_patients_from_emap
from hyflow.fetch.icu_observations import icu_observations_from_emap

Minimal example using the HyFlow package

Fetch ICU Episode Slices from EMAP

# the point-in-time we are interested in:  7am on 17/07/2021 BST
horizon_dt = datetime(2021, 7, 17, 7, 0, 0).astimezone(LONDON_TZ)
beds_df = beds_from_hydef(ward)
episode_slices_df = icu_episode_slices_from_emap(ward, horizon_dt, list(beds_df.hl7_location))

The HyFlow method adds the episode_key as that is a HYLODE concept and not available in EMAP.

episode_slices_df
# Attach HyDef bed_id to episode slice & drop HL7 location string
episode_slices_df = episode_slices_df.join(
    beds_df.loc[:, ['bed_id', 'hl7_location']].set_index('hl7_location'),
    on='hl7_location'
).drop(columns=['hl7_location'])
episode_slices_df

Fetch matching Patients from EMAP for Episode Slices that are in in HyFlow

# the point-in-time we are interested in:  8pm on 17/07/2021 BST
horizon_dt = datetime(2021, 7, 17, 20, 0, 0).astimezone(LONDON_TZ)
# get our saved Episode Slices
episode_slices_df = icu_episode_slices_from_hyflow(ward, horizon_dt)
episode_slices_df.head()
patients_df = icu_patients_from_emap(ward, horizon_dt, list(episode_slices_df.csn))
patients_df
# Attach HyFlow episode_slice_id to patient
patients_df = patients_df.join(
    episode_slices_df.loc[:, ['episode_slice_id', 'csn']].set_index('csn'),
    on='csn'
).drop(columns=['csn'])
patients_df

Fetch matching Observations of interest in EMAP for Episode Slices that are in HyFlow

lookback_hrs = 24 # size of the trailing window we are interested in
# the point-in-time we are interested in:  10am on 17/07/2021 BST
horizon_dt = datetime(2021, 7, 17, 10, 0, 0).astimezone(LONDON_TZ)
# get our saved Episode Slices
episode_slices_df = icu_episode_slices_from_hyflow(ward, horizon_dt)
episode_slices_df.head()
# get our reference list of observation types we care about
obs_types_df = icu_observation_types_from_hydef()
obs_types_df.head()
obs_df = icu_observations_from_emap(
    ward,
    horizon_dt,
    list(episode_slices_df.csn),
    list(obs_types_df.source_id),
    lookback_hrs
)
obs_df
# Attach HyDef observation type id to observation
obs_df = obs_df.join(
    obs_types_df.loc[:, ['obs_type_id', 'source_id']].set_index('source_id'),
    on='source_id'
)
# Attach HyFlow episode_slice_id to observation
obs_df = obs_df.join(
    episode_slices_df.loc[:, ['episode_slice_id', 'csn']].set_index('csn'),
    on='csn'
).drop(columns=['csn'])
obs_df

Accessing the hyflow schema directly

Directly querying the hylode database will return personally identifiable information

Like most tables in the hylode database, the hyflow schema tables are all immutable logs.
That means data is only ever appended, never updated in place.

This also means, for example, that an individual patient will have many records in the icu_patients_log table,
one for each slice that was taken while their episode was active.

Important notes about direct Hylode DB access:
The queries provided through the functions in the hyflow & hygear packages take care of removing duplicates.
If you access the schemas directly you will need to do that yourself - see the various hyflow__*.sql files in hygear/load/sql for examples of partitioning over episode_slice_id and horizon_dt columns.

Other conveniences are provided by the packages.
For example, the Postgres/SQLAlchemy/Pandas stack does not support storing timedeltas directly (even though it is a supported data type in both Postgres & Pandas, SQLAlchemy is unable to handle it).
That means the raw span column in the hyflow.icu_episode_slices_log table is in nanoseconds.
Converting to a timedelta is done in the packages but you’ll have to do that yourself if you access the raw tables.

ICU Episode Slices with Bed Id

sql_episode_slices_df = pd.read_sql(
    '''
        select 
            ep.id AS episode_slice_id
            , ep.episode_key
            , ep.csn
            , ep.admission_dt
            , ep.discharge_dt
            , beds.source_id AS bed
            , horizon_dt
            , log_dt
        from hyflow.icu_episode_slices_log ep
            inner join hydef.beds beds ON ep.bed_id = beds.id
            inner join hydef.bays bays ON beds.bay_id = bays.id
         WHERE bays.ward_code = %(ward)s
        order by episode_key, horizon_dt limit 1000
    ''', 
    hydb_engine,
    params={'horizon_dt': horizon_dt, 'ward': 'T03'}
)
sql_episode_slices_df

ICU Patients

sql_patients_df = pd.read_sql('''
    select 
        id AS patient_log_id
        , episode_slice_id
        , mrn
        , name
        , dob
        , sex
        , ethnicity
        , postcode
        , horizon_dt
        , log_dt
    from hyflow.icu_patients_log 
    order by mrn, horizon_dt limit 1000''', 
    hydb_engine
)
sql_patients_df

ICU Observations

sql_obs_df = pd.read_sql('select * from hyflow.icu_observations_log order by episode_slice_id, horizon_dt limit 1000', hydb_engine)
sql_obs_df

HyGear

The HyGear component is responsible for transforming raw data collected by HyFlow into features that will be consumed by the models in HyMind.

The idea is that explorative feature engineering happens on source data from HyFlow or EMAP in the HyMind Lab.
Once the transformation has been refined, it is added to the hygear package.

The hygear package itself is available for use from within the HyMind Lab.

Similarly, all the tables in the hygear schema in the hylode database can be accessed directly.
Like the hyflow tables shown above, these are also in the immutable log style.

from hygear.load.hydef import icu_observation_types_from_hydef
from hygear.load.hyflow import icu_episode_slices_from_hyflow, icu_patients_from_hyflow, icu_observations_from_hyflow
from hygear.load.hygear import icu_features_from_hygear

from hygear.transform.cog1.icu_therapeutics import (
    InotropeTransformer,
    NitricTransformer,
    RenalTherapyTransformer
)
from hygear.transform.cog1.icu_patient_state import (
    AgitationTransformer,
    DischargeStatusTransformer,
    PronedTransformer
)
from hygear.transform.cog1.icu_ventilation import TracheostomyTransformer, VentilationTypeTransformer
from hygear.transform.cog1.icu_vitals import (
    HeartRateTransformer,
    RespiratoryRateTransformer,
    TemperatureTransformer
)
from hygear.transform.cog1.icu_temporal import AdmissionAgeTransformer, LengthOfStayTransformer, DischargeLabelTransformer
from hygear.transform.cog2.icu_work_intensity import WorkIntensityTransformer

Single Horizon Example

# the point-in-time we are interested in:  10pm on 31/08/2021 BST
horizon_dt = datetime(2021, 8, 31, 22, 0, 0).astimezone(LONDON_TZ)

Fetch ICU Episode Slices active at a specific point-in-time

episode_slices_df = icu_episode_slices_from_hyflow(
    ward,
    horizon_dt
)
episode_slices_df

Fetch matching Patients for ICU Episode Slices active at a specific point-in-time

# fetch matching patients
patients_df = icu_patients_from_hyflow(
    ward,
    horizon_dt,
    list(episode_slices_df.episode_slice_id)
)
patients_df.head()
# join episode slices with patients
combined_df = episode_slices_df.join(
        patients_df.loc[:, ['episode_slice_id', 'mrn', 'dob', 'sex', 'ethnicity']].set_index('episode_slice_id'),
        on='episode_slice_id'
    ).drop(['log_dt', 'horizon_dt'], axis=1)
combined_df.head()

Fetch matching Observations for ICU Episode Slices active at a specific point-in-time

this is in long format, multiple rows per episode_slice_id

# number of trailing hours we are interested in
lookback_hrs = 24
# fetch matching observations
obs_df = icu_observations_from_hyflow(
    ward,
    horizon_dt,
    list(episode_slices_df.episode_slice_id),
    lookback_hrs
)
obs_df.head()
# fetch the observation types reference catalogue
obs_types_df = icu_observation_types_from_hydef()
# join observations with metadata
obs_df = obs_df.join(
    obs_types_df.set_index('obs_type_id'),
    on='obs_type_id'
)
obs_df.head()
# join observations with episode slices to get episode key
eps_obs_df = obs_df.join(
    episode_slices_df.loc[:, ['episode_slice_id', 'episode_key', 'admission_dt', 'discharge_dt']].set_index('episode_slice_id'),
    on='episode_slice_id'
)
eps_obs_df.groupby('episode_key')['obs_id'].count().rename('n_observations')

Fetch generated ICU Patient State Features for ICU Episode Slices active at a specific point-in-time

patient_state_df = icu_features_from_hygear(
    'patient_state',
    ward,
    horizon_dt,
    list(episode_slices_df.episode_slice_id)
)
patient_state_df
# join with patient state
combined_df = combined_df.join(
        patient_state_df.loc[:, ['episode_slice_id', 'is_proned_1_4h', 'discharge_ready_1_4h', 'is_agitated_1_8h']].set_index('episode_slice_id'),
        on='episode_slice_id'
    )
combined_df.head()

Fetch generated ICU Therapeutics Features for ICU Episode Slices active at a specific point-in-time

therapeutics_df = icu_features_from_hygear(
    'therapeutics',
    ward,
    horizon_dt,
    list(episode_slices_df.episode_slice_id)
)
therapeutics_df
# join with therapeutics
combined_df = combined_df.join(
        therapeutics_df.loc[:, ['episode_slice_id', 'n_inotropes_1_4h', 'had_nitric_1_8h', 'had_rrt_1_4h']].set_index('episode_slice_id'),
        on='episode_slice_id'
    )
combined_df.head()

Fetch generated ICU Ventilation Features for ICU Episode Slices active at a specific point-in-time

ventilation_df = icu_features_from_hygear(
    'ventilation',
    ward,
    horizon_dt,
    list(episode_slices_df.episode_slice_id)
)
ventilation_df
# join with ventilation
combined_df = combined_df.join(
        ventilation_df.loc[:, ['episode_slice_id', 'had_trache_1_12h', 'vent_type_1_4h']].set_index('episode_slice_id'),
        on='episode_slice_id'
    )
combined_df.head()

Fetch generated ICU Vitals Features for ICU Episode Slices active at a specific point-in-time

vitals_df = icu_features_from_hygear(
    'vitals',
    ward,
    horizon_dt,
    list(episode_slices_df.episode_slice_id)
)
vitals_df
# join with vitals
combined_df = combined_df.join(
        vitals_df.loc[:, ['episode_slice_id', 'avg_heart_rate_1_24h', 'max_temp_1_12h', 'avg_resp_rate_1_24h']].set_index('episode_slice_id'),
        on='episode_slice_id'
    )
combined_df.head()

Fetch generated ICU Work Intensity Metric Features for ICU Episode Slices active at a specific point-in-time

wim_df = icu_features_from_hygear(
    'work_intensity',
    ward,
    horizon_dt,
    list(episode_slices_df.episode_slice_id)
)
wim_df
# join with work intensity
combined_df = combined_df.join(
        wim_df.loc[:, ['episode_slice_id', 'wim_1']].set_index('episode_slice_id'),
        on='episode_slice_id'
    )
combined_df.head()

Fetch generated ICU Temporal Features for ICU Episode Slices active at a specific point-in-time

temporal_df = icu_features_from_hygear(
    'temporal',
    ward,
    horizon_dt,
    list(episode_slices_df.episode_slice_id)
)
temporal_df
# join with temporal
combined_df = combined_df.join(
        temporal_df.loc[:, ['episode_slice_id', 'elapsed_los_td', 'total_los_td', 'remaining_los_td',]].set_index('episode_slice_id'),
        on='episode_slice_id'
    )
combined_df.head()

Combined

combined_df.shape
combined_df.head()