Vignette - data flow#

What follows is a quick tour under the hood of Hylode…

In vignette_1_training_set, we miraculously wound up with a usable set of features on running retro_dataset. This is of course because they had been lovingly built by Nel in advance.

This notebook is aimed as a leg-up in getting your bearings around how Hylode ingests and processes data from EMAP. I include the different pieces of code and ways of thinking that have helped me, in the hope they will help others.

An over-arching view#

As good a starting point as any is the HySys architectural diagram linked to here. (You need to be logged into GitHub to view)

This picture gives an overview of how the system fits together. In terms of data ingestion, we can see HyFlow and HyGear, the two components responsible for fetching and transforming the data from EMAP (& other sources). Then sitting above them, there is HyCommand which controls different requests for the various subcomponents.

In the current version, an example of how HyCommand does its work can be found in the ICU Demand dag. This scheduled code triggers the appropriate actions from HyFlow and HyGear for initial ingestion and transformation of the data.

(The PowerPoint slide DAG.pptx in this directory (download by opening in a new window) shows you the complete set operations the DAG triggers. Don’t be disheartened if this seems like a bit much, we will have a look at it piece-by-piece…)

An example: HyFlow fetch_episode_slices#

Looking at that file for the dag, let’s start by looking at the code here:

        fetch_episode_slices_task = SimpleHttpOperator(
            task_id=f"fetch_episode_slices-{ward}",
            http_conn_id="hyflow_api",
            method="POST",
            endpoint="/trigger/fetch/icu/episode_slices",
            headers=default_http_headers,
            data=json.dumps(fetch_task_input),
            extra_options={
                "check_response": False
            },  # turn off the default response check
            response_check=skip_on_empty_result_set,  # add a custom response check
        )

This makes the API call to the hyflow_api to fetch the episode slices for a given ward. This can be found beautifully documented by looking at the HyFlow API docs (here at the time of writing). Here we can see that fetch_episode_slices is designed to:

Append Hylode episode slices to the hyflow.icu_episode_slices_log table for episodes which were active on the ward at the horizon.

A Hylode episode is defined as a stay on a specific ward with a limited allowable break between bed location visits on that ward. An episode slice is a fraction, up to & incl 100%, of an episode.

SQL extraction code#

Digging a little bit deeper, we can trace this back to the SQL code. The code corresponding to fetch_episode_slices can be found in the function fetch_episode_slices found in the definition of the endpoint here. Here we can see the following code slice:

    episode_slices_df = icu_episode_slices_from_emap(
        ward, horizon_dt, list(beds_df.hl7_location)
    )

Let’s perhaps have a look at what this icu_episode_slices_from_emap function is…

from hyflow.fetch.icu.icu_episode_slices import icu_episode_slices_from_emap
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
Cell In [1], line 1
----> 1 from hyflow.fetch.icu.icu_episode_slices import icu_episode_slices_from_emap

ModuleNotFoundError: No module named 'hyflow'
??icu_episode_slices_from_emap

Okay… so looking at this we can see that this function first call icu_location_visits_from_emap

from hyflow.fetch.icu.icu_episode_slices import _icu_location_visits_from_emap
??_icu_location_visits_from_emap

…which in turn is running an sql query from file emap__icu_location_visit_history.sql. Looking this up in the Hylode code, we find the corresponding file here and can run the corresponding query in DBForge (being sure to substitute for the parameters prefixed by %)

Alternatively we can do that here in a notebook… (see appendix 1)

Processing and storage in hylode_db#

Following through on the rest of the definition of icu_episode_slices_from_emap, we can see this function goes onto call _coalesce_icu_location_visits_into_episode_slices which generates our notion of ICU location visits (as described in the functions docstring - see using the ?? shortcut).

Then returning back again to the code for fetch_icu_episode_slices we can see a call df_to_hylode_db. This is where the dataframe extracted from EMAP and then restructured to episode slices is stored in the Hylode databases.

A very comparable process happens to bring in the observations into Hylode, so with some ferreting out (along the lines above) it should be possible to find the corresponding pieces of code. Next up is to transform the data from there…

Another example: HyGear transformers#

In talking about how the Hylode ML system works, often a lot of discussions come back to the transformers. These are the pieces of code that take the data from a format not a million miles from that in EMAP into reproducible features for both retrospective model training and deployment.

As in our section above on HyFlow the HyGear transformers are called on a schedule from the ICU Demand dag.

Take for instance the code here:

        generate_icu_temporal_task = SimpleHttpOperator(
            task_id=f"generate_icu_temporal-{ward}",
            http_conn_id="hygear_api",
            method="POST",
            endpoint="/trigger/generate/icu/cog1/temporal",
            headers=default_http_headers,
            data=json.dumps(transform_task_input),
        )

This makes the API call to the hygear_api to generate the ICU patient temporal features (age, elapsed length-of-stay etc.) for a given ward. This can be found beautifully documented by looking at the HyGear API docs (here at the time of writing). Here we can see that temporal is designed to:

Append temporal features to the hygear.icu_temporal_log table for episode slices active on the ward at the horizon.

Transformers under the hood#

Again we can go back to the definition of the endpoint found in this case here where we have the function generate_icu_temporal. This code allows us to actually look under the hood of the transformer. What we can see happening is that this code is pulling out the icu_patients_from_hyflow and applying a series of Transformer functions to them, namely: AdmissionAgeTransformer and LengthOfStayTransformer. Let’s have a look at one of these…

from hygear.transform.cog1.icu_temporal import AdmissionAgeTransformer
AdmissionAgeTransformer??

We can see that the transformer is a class. It takes a series of specified input_cols, and then has a defined transform method to output a specified set of output cols. Included in Appendix 2 is some code to run this transformer across a dataframe. You can use this same structure to develop and test new transformers in a notebook.

Once this feature transformation is done and dusted, we are ready to use Nels’ HyCastle machinery to pull together our feature sets for model training.

Appendix 1: Running SQL in Jupyter#

(magpied from Nels’ existing HyMind exemplar)

from datetime import datetime, timedelta
import os
from pathlib import Path
from pprint import pprint
import urllib

import arrow
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sqlalchemy import create_engine

from hylib.dt import LONDON_TZ, convert_dt_columns_to_london_tz
ward = 'T03'

EMAP credentials#

EMAP credentials are allocated per user and not stored in the environment variables. You do not want your credentials to leak into the source repository.

One way of safeguarding is to create a file called secret at the top level of the HyMind repository (one above this notebook).
Do this here in Jupyter and not a local copy of the repo.

The first line should be your UDS username and the second line should be your UDS password.

secret has been added to .gitignore and will be excluded from the repository.

Read your username & password into the environment:

os.environ['EMAP_DB_USER'], os.environ['EMAP_DB_PASSWORD'] = Path('../secret').read_text().strip().split('\n')
uds_host = os.getenv('EMAP_DB_HOST')
uds_name = os.getenv('EMAP_DB_NAME')
uds_port = os.getenv('EMAP_DB_PORT')
uds_user = os.getenv('EMAP_DB_USER')
uds_passwd = os.getenv('EMAP_DB_PASSWORD')

Create a SQLAlchemy Engine for accessing the UDS:

emapdb_engine = create_engine(f'postgresql://{uds_user}:{uds_passwd}@{uds_host}:{uds_port}/{uds_name}')
from hyflow.settings import SQL_DIR
visits_sql = (SQL_DIR / "emap__icu_location_visit_history.sql").read_text()
# the point-in-time we are interested in:  7am on 17/07/2021 BST
horizon_dt = datetime(2021, 7, 17, 7, 0, 0).astimezone(LONDON_TZ)
from hylib.load.hydef import beds_from_hydef
beds_df = beds_from_hydef(ward)
visits_df = pd.read_sql(
    visits_sql,
    emapdb_engine,
    params={"horizon_dt": horizon_dt, "locations": list(beds_df.hl7_location)},
)
visits_df.head()

Appendix 2: Running some Transformer code#

from datetime import datetime
import logging

from fastapi import APIRouter

from hylib.load.hydef import icu_observation_types_from_hydef

from hyflow.load.icu.icu_episode_slices import icu_episode_slices_from_hyflow
from hyflow.load.icu.icu_observations import icu_observations_from_hyflow
from hyflow.load.icu.icu_patients import icu_patients_from_hyflow

from hygear.transform.cog1.base import BaseCog1Transformer
from typing import List
class AdmissionAgeTransformer(BaseCog1Transformer):
    """
    An transformer for age at admission

    Output Features:
        `admission_age_years`: float
            Patient's age in years
    """

    input_cols = ["episode_slice_id", "admission_dt", "dob"]

    @property
    def output_cols(self) -> List[str]:
        return ["episode_slice_id", "admission_age_years"]

    def years(self, row: pd.Series) -> float:
        if pd.isnull(row.dob):
            return np.nan
        else:
            return int(row["admission_dt"].year) - int(row["dob"].year)

    def transform(self) -> pd.DataFrame:
        output_df = self.input_df

        output_df["admission_age_years"] = output_df.apply(self.years, axis=1)

        return output_df.loc[:, self.output_cols]
ward
horizon_dt = datetime(2021, 10, 12, 11, 00).astimezone(LONDON_TZ)
episode_slices_df = icu_episode_slices_from_hyflow(ward, horizon_dt)
episode_slices_df.shape
patients_df = icu_patients_from_hyflow(
    ward, horizon_dt, list(episode_slices_df.episode_slice_id)
)
age_input_df = episode_slices_df.loc[:, ["episode_slice_id", "admission_dt"]].join(
    patients_df.loc[:, ["episode_slice_id", "dob"]].set_index("episode_slice_id"),
    on="episode_slice_id",
)
age_df = AdmissionAgeTransformer(ward, horizon_dt, age_input_df).transform()
output_df = episode_slices_df.loc[:, ["episode_slice_id"]].join(
    age_df.set_index("episode_slice_id"), on="episode_slice_id"
)
age_df