Vignette - a Modelling Workflow#

In this notebook, we look at how the various different pieces of the Hylode architecture come together to ease the ML4H model development/deployment process.

In vignette_1_training_set, we looked at how HyCastle and the lens abstraction make for consistent training pathways between model development and deployment.

Here, we bring these components together in a modelling workflow. Core steps are to show:

~ how HyCastle and the lens come together to make our training & validation sets
~ how we use MLFlow as a central spine for recording our model training
~ how we then can check out models from MLFlow - either for further evaluation or live deployment 

Imports#

We start with a long list of imports…

from typing import List
import os
import tempfile
from pathlib import Path
import pickle
from uuid import uuid4
import datetime

import cloudpickle
import yaml
import pandas as pd
from matplotlib import pyplot as plt
import mlflow
from mlflow.tracking import MlflowClient
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import ParameterGrid, train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, log_loss
from sklearn.compose import ColumnTransformer
from sklearn.exceptions import NotFittedError
from sklearn.utils.validation import check_is_fitted
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.impute import MissingIndicator, SimpleImputer
from sklearn.pipeline import Pipeline as SKPipeline
from sklearn.preprocessing import (
    FunctionTransformer,
    OneHotEncoder,
    OrdinalEncoder,
    StandardScaler,
)

%matplotlib inline
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
Cell In [1], line 9
      6 from uuid import uuid4
      7 import datetime
----> 9 import cloudpickle
     10 import yaml
     11 import pandas as pd

ModuleNotFoundError: No module named 'cloudpickle'
from hylib.dt import LONDON_TZ
from hycastle.lens.base import BaseLens
from hycastle.lens.transformers import DateTimeExploder, timedelta_as_hours
from hycastle.lens.icu import BournvilleICUSitRepLens
from hycastle.icu_store.live import live_dataset
from hycastle.icu_store.retro import retro_dataset
from hymind.lib.models.icu_aggregate import AggregateDemandModel
# initialise MLFlow
mlflow_var = os.getenv('HYMIND_REPO_TRACKING_URI')
mlflow.set_tracking_uri(mlflow_var)   

client = MlflowClient()

Training & Validation Sets#

Okay. So as our first port of call, we want to show how HyCastle and the lens abstraction can furnish training and validation sets for our modelling efforts.

To recap, we start here with the retro_dataset function from HyCastle.

We pass the argument ‘T03’ to restrict the patients we look at to the T03 ICU at UCLH. Because of the way Hylode has been configured for the ICU, this then gives one set of features (a row) per hour for every patient in our retrospective dataset.

Let’s have a look:

df = retro_dataset('T03')
df.shape
df.head()

Let’s check to see the date range that retro_dataset covers. (n.b. horizon_dt is the datetime for the particular hourly snapshot of a patient’s features)

df.horizon_dt.min(), df.horizon_dt.max()

Okay. So we see that the retrospective data currently in Hylode runs from c. April ‘21. For the sake of this demo, let’s take April, May & June as our training data. And then July for validation.

We define some corresponding datetimes…

start_train_dt = datetime.datetime(2021,4,3,2,0,0).astimezone(LONDON_TZ)
end_train_dt = datetime.datetime(2021,6,30,23,0,0).astimezone(LONDON_TZ)

start_valid_dt = datetime.datetime(2021,7,1,1,0,0).astimezone(LONDON_TZ)
end_valid_dt = datetime.datetime(2021,7,31,23,0,0).astimezone(LONDON_TZ)

Using these date ranges, we take slices of the retro_dataset corresponding to our training and validation windows.

train_df = df[(start_train_dt < df['horizon_dt']) & (df['horizon_dt'] < end_train_dt)]
valid_df = df[(start_valid_dt < df['horizon_dt']) & (df['horizon_dt'] < end_valid_dt)]
train_df.head()

This is a start, as we now have the appropriate time restrictions for a basic experimental setup – although you will notice that our train_df still has the full set of output features from HyCastle. As per our demonstration of the lens in the previous notebook, we want to restrict the set of features we see and pre-process them appropriately.

We start by defining a lens as below… (If this still looks a bit daunting, the appendix in the previous notebook on inspecting the different components of the lens might come in useful.)

class DemoLens(BaseLens):
    numeric_output = True
    index_col = "episode_slice_id"

    @property
    def input_cols(self) -> List[str]:
        return [
            "episode_slice_id",
            "admission_age_years",
            "avg_heart_rate_1_24h",
            "max_temp_1_12h",
            "avg_resp_rate_1_24h",
            "elapsed_los_td",
            "admission_dt",
            "horizon_dt",
            "n_inotropes_1_4h",
            "wim_1",
            "bay_type",
            "sex",
            "vent_type_1_4h",
        ]

    def specify(self) -> ColumnTransformer:
        return ColumnTransformer(
            [
                (
                    "select",
                    "passthrough",
                    [
                        "episode_slice_id",
                        "admission_age_years",
                        "n_inotropes_1_4h",
                        "wim_1",
                    ],
                ),
                ("bay_type_enc", OneHotEncoder(), ["bay_type"]),
                (
                    "sex_enc",
                    OrdinalEncoder(
                        handle_unknown="use_encoded_value", unknown_value=-1
                    ),
                    ["sex"],
                ),
                (
                    "admission_dt_exp",
                    DateTimeExploder(),
                    ["admission_dt", "horizon_dt"],
                ),
                (
                    "vent_type_1_4h_enc",
                    OrdinalEncoder(
                        handle_unknown="use_encoded_value", unknown_value=-1
                    ),
                    ["vent_type_1_4h"],
                ),
                (
                    "vitals_impute",
                    SimpleImputer(strategy="mean", add_indicator=False),
                    [
                        "avg_heart_rate_1_24h",
                        "max_temp_1_12h",
                        "avg_resp_rate_1_24h",
                    ],
                ),
                # note we include then elapsed length of stay as a feature for our model,
                # as an alternative to training multiple models for different timepoints
                (
                    "elapsed_los_td_hrs",
                    FunctionTransformer(timedelta_as_hours),
                    ["elapsed_los_td"],
                ),
            ]
        )

So what we want to do is apply this lens to train_df and valid_df to give us our feature sets.

# we start be instantiating the lens
lens = DemoLens()

# then we fit the lens on the training set, and transform that df
X_train = lens.fit_transform(train_df)

# similarly for the validation set, although here we only use transform(),
# as we have already fit the lens on train_df
X_valid = lens.transform(valid_df)

The ICU Demand pipeline also usefully includes our predictive label of whether or not the patient was discharged within 48 hours of the horizon_df. We use this to define our targets:

y_train = train_df['discharged_in_48hr'].astype(int)
y_valid = valid_df['discharged_in_48hr'].astype(int)
X_train.shape, y_train.shape, X_valid.shape, y_valid.shape

To check that the lens has made our features and labels look the way our SKLearn model wants them to look, let’s pass them through a dummy Random Forest run:

m = RandomForestClassifier(n_jobs=-1)
%time m.fit(X_train.values, y_train.values.ravel())

Great! So with that our training and validation sets are ready to start running experiments.

MLFlow Training Workflow#

In Hylode modelling work to date, a central part of the workflow has been an open source software product from Databricks called MLFlow. Why have we felt the need to incorporate this into our stack?

Even working on an individual level, rigour around logging experimental results and outcomes yield a strong dividend. MLFlow provides a very flexible framework for achieving this - whether one is simply looking to keep house or aiming higher, perhaps at easy reproduction of results.

In the Hylode scenario, where we are looking at collaboration between multiple different team members - both data scientists and software developers - the return from using a tool like MLFlow is an order of magnitude greater still. By centralising our models and data on performance, MLFlow eases the handover of models trained by the HyMind team into models deployed by the HySys team.

Excellent MLFlow documentation can be found here.

Here our aims are modest. We just want to log a simple modelling workflow for ICU discharge. Key elements to look out for are:

~ the entire experiment logged against a single experiment ID
~ metadata about training and validation sets logged consistently in MLFlow
~ all metrics (accuracy etc.) logged from each training run on MLFlow
~ the actual model file for each training run stored in MLFlow (from where it's easy for the HySys team to check it out).

As a first step let’s create a new experiment. We use the pipe-separated (Owner|Type|Name|Date) naming convention to keep each other’s work from getting mixed up:

# Owner|Type|Name|Date e.g. 'TK|models|vignette|2021-11-22'
# n.b. if experiment name already exists, this cell with throw an error
#
# => add a unique experiment below <=
exp_name =


os.environ["MLFLOW_EXPERIMENT_NAME"] = exp_name
experiment_id = mlflow.create_experiment(exp_name)

experiment_id

With this done, you should see a new experiment exists on the bottom left hand side of the MLFlow web user interface (which we refer to as the HyMind Repo). As of the time of writing, the HyMind Repo can be accessed here

Now. What we want to do as we move along the modelling pathway is to log the salient bits of information in MLFlow as we go. To make this easier to do, we start by defining a few convenience functions to log strings, dicts and lenses.

n.b. these all rely on mlflow.log_artifact() which takes a file from our local directory and adds it to the HyMind Repo. (Alongside our import statements above, we let MLFlow where our Repo is using the HYMIND_REPO_TRACKING_URI environment variable.)

tmp_path = Path('tmp')
tmp_path.mkdir(parents=True, exist_ok=True)

def mlflow_log_string(text, filename):
    full_path = tmp_path / filename
    with open(full_path, 'w') as f:
        f.write(str(text))
    mlflow.log_artifact(full_path)

def mlflow_log_tag_dict(tag_dict, filename):
    """Logs tag dict to MLflow (while preserving order unlike mlflow.log_dict)"""
    full_path = tmp_path / filename
    with open(full_path, 'w') as f:
        yaml.dump(tag_dict, f, sort_keys=False)
    mlflow.log_artifact(full_path)
    
def mlflow_log_lens(l):
    full_path = l.pickle(tmp_path)
    mlflow.log_artifact(full_path, 'lens')

First off, to get a hang of this, let’s run a test of sending some data to MLFlow. Let’s try storing the start and end time for our trainging and validation runs…

tag_dict = {
    'start_train_dt': start_train_dt,
    'end_train_dt': end_train_dt,    
    'start_valid_dt': start_valid_dt,
    'end_valid_dt': end_valid_dt
}
with mlflow.start_run():
    mlflow_log_tag_dict(tag_dict, 'tag_dict.yaml')

Now, if you navigate back to the HyMind Repo MLFlow UI and click on the new experiment that you created (on the bottom left hand side), you should now see that there is a recent row in the table that lists runs for this experiment.

If you then click on this run, you can scroll down the page and under ‘Artifacts’ you will find a copy of tag_dict.yaml. This is now stored in the HyMind Repo to pull out as and when we need.

One point worth mentioning the with mlflow.start_run() syntax above, this is the standard MLFlow way to create a new run (nested under the current experiment). The with statement automatically closes the run at the end of the indented code.

A fuller example#

With slightly better sense of how MLFlow works, we now turn to a fuller exemplar workflow. The example we look at here is running a simple parameter grid search for a Random Forest model of ICU discharge at 48 hours.

# the two most influential parameters 
# cf. https://scikit-learn.org/stable/modules/ensemble.html#parameters
grid = {
    'n_estimators':[10, 50, 100],
    'max_features':[None, "sqrt", "log2"]
}
# as the outcome of each training run (even with the same parameters) is non-deterministic,
# we run two training runs per parameter combination.
runs_per_param_set = 2

for i in range(runs_per_param_set):
    
    for g in ParameterGrid(grid):
        m = RandomForestClassifier(n_jobs=-1)

        with mlflow.start_run():
            
            # logging the tag dictionary, the run_type
            mlflow_log_tag_dict(tag_dict, 'tag_dict.yaml')
            mlflow.set_tag("run_type", "training")
            
            # set and log this run's set of model parameters
            m.set_params(**g)
            mlflow.log_params(g)

            m.fit(X_train.values, y_train.values.ravel())
            
            # calculate and log training and validation set accuracy
            train_accuracy = m.score(X_train.values, y_train.to_numpy())
            mlflow.log_metric('train_accuracy', train_accuracy)
            valid_accuracy = m.score(X_valid.values, y_valid.to_numpy())       
            mlflow.log_metric('valid_accuracy', valid_accuracy)
            
            # ditto for confusion matrices
            train_confusion = confusion_matrix(m.predict(X_train.values), y_train.to_numpy())
            mlflow_log_string(train_confusion, 'train_confusion.txt')
            valid_confusion = confusion_matrix(m.predict(X_valid.values), y_valid.to_numpy())
            mlflow_log_string(valid_confusion, 'valid_confusion.txt')

            # store the trained SKLearn model, so we can check it out later
            mlflow.sklearn.log_model(m, 'model')

After this cells runs (which takes a minute or two), if you now return to the MLFlow UI, you will see that the experiment you created is now populated with a whole list of runs, one for each parameter set above. Clicking down into the run you will see all the attributes above have been stored (the tag dictionary, the parameters, the metrics, the model etc.)

As a next step, we might want to pick out the model parameters that seem to have performed best - so we can then use these for further evaluation.

These runs can also be straightforwardly access from a notebook using mlflow.search_runs()

runs = mlflow.search_runs()
runs.head()

From which starting point, it’s simple to mark out the parameter set with the best mean validation accuracy, as follows:

params = [col for col in runs if col.startswith('params')]
best_params = runs.groupby(params)['metrics.valid_accuracy'].mean().idxmax()
best_row = runs.set_index(keys=params).loc[best_params]

best_run_id = list(best_row['run_id'])[0]
best_run_id

And then we can tag this as the best run from our training loop - and also log the lens we used to train it:

with mlflow.start_run(run_id=best_run_id):
    # tag the run as best_row
    mlflow.set_tag('best_run', 1)   

    # log the lens
    mlflow_log_lens(lens)

In the same breath, MLFlow gives us the option to register our model, which makes it easy to access and work with going forward - so let’s do that too:

# => add a unique model name below <=
# e.g. tk-random_forest-demo
model_name =
# n.b. each time you run this cell with the same model_name, the model version will increase by one
registered_model = mlflow.register_model(f'runs:/{best_run_id}/model', model_name)

Which is great. And now you should be able to navigate to the MLFlow UI - and if you click on the ‘Models’ tab at the top of the page you should see your newly registered model waiting there on the list.

Checking models out from MLFlow#

By this stage of the notebook, we have invested quite a lot of effort in creating a parallel record of our experiment in MLFlow. In the final section of the notebook, we seek to show how this investment pays off. We work through two principal workflows:

~ checking out the model for forward pass prediction
~ checking it out for further evaluation

But let’s start simple by retrieving the model. First, a couple of simple methods that allow us to pull out info about the model we have just saved:

# first off, we can surface basic info about the model using our model_name and version.

model_info = client.get_model_version(model_name, registered_model.version)
model_info
# we can then go deeper and inspect the run itself
run_info = client.get_run(model_info.run_id)
run_info

Happy that the information above looks about right, we can now use the model_name and version to load our model:

model = mlflow.sklearn.load_model(f'models:/{model_name}/{registered_model.version}')
model

Moreover, using model_info.run_id, we can also reload the lens we used to train the model:

with tempfile.TemporaryDirectory() as tmp:
    tmp_dir = Path(tmp)
    
    client.download_artifacts(model_info.run_id, 'lens', tmp_dir)
    
    lens_path = next((tmp_dir / 'lens').rglob('*.pkl'))
    with open(lens_path, 'rb') as f:
        loaded_lens = pickle.load(f)
        
loaded_lens

Forward pass prediction#

With the model and the lens loaded, the live_dataset from HyCastle makes it extremely straightforward to run the forward pass. (n.b. reusing the identical components to in our retrospective training)

live_df = live_dataset('T03')
live_df.shape
# and inspecting the dataframe, note the most recent admission_dt
live_df.loc[:, ['episode_slice_id', 'admission_dt', 'bed_code', 'avg_heart_rate_1_24h']].sort_values('admission_dt', ascending=False).head()

Now let’s try to run some patient-level predictions based on our saved model:

# first we transform the live_df with our loaded_lens
X_df = loaded_lens.transform(live_df)
X_df.columns
# making the predictions
predictions = model.predict_proba(X_df.values)

# adding the predictions to our live_df dataframe
live_df['prediction'] = predictions[:, 1]
live_df.loc[:, ['episode_slice_id', 'prediction']].head()

We can even then get a sense of how this segues into the aggregate prediction problem, using the AggregateDemandModel class:

AggregateDemandModel??
agg_demand = AggregateDemandModel()
agg_predictions = agg_demand.predict(context="", 
                                     model_input=live_df.loc[:, ['prediction']].rename(mapper={'prediction':'prediction_as_real'},axis=1))
agg_predictions.plot()

Further evaluation#

Another use case would be that having done our initial training, we still have plenty of work to do evaluating it’s performance. We give a very simple outline here of what that might look like.

We start by putting our two loaded components from MLFlow: loaded_tag_dict and loaded_lens together to rebuild our validation set.

with tempfile.TemporaryDirectory() as tmp:
    tmp_dir = Path(tmp)
    
    client.download_artifacts(model_info.run_id, './', tmp_dir)
    
    tag_dict_path = tmp_dir / 'tag_dict.yaml'
    with open(tag_dict_path, 'r') as stream:
        loaded_tag_dict = yaml.load(stream, Loader=yaml.FullLoader)
        
loaded_tag_dict
loaded_valid_df = df[(loaded_tag_dict['start_valid_dt'] < df['horizon_dt']) &
                 (df['horizon_dt'] < loaded_tag_dict['end_valid_dt'])]

Recreating our dataset as follows:

X_valid = loaded_lens.transform(loaded_valid_df)
y_valid = loaded_valid_df['discharged_in_48hr'].astype(int)
# then we have already loaded in our model in the previous section
model
with mlflow.start_run(run_id=best_run_id):
    
    mlflow_log_tag_dict(tag_dict, 'tag_dict.yaml')
    
    # create a 2-column dataframe of the predicted probabilities and true label,
    # for each patient in the validation set
    eval_df = pd.DataFrame({
                'predict_proba':model.predict_proba(X_valid.values)[:,1], 
                'label':y_valid.to_numpy().ravel()
               }, 
        columns=['predict_proba','label'],
        index=X_valid.index)   
    eval_df['horizon_dt'] = loaded_valid_df.set_index('episode_slice_id')['horizon_dt']
    
    # write eval_df to csv and log in MLFlow
    eval_path = tmp_path / 'eval.csv'
    eval_df.to_csv(eval_path)
    mlflow.log_artifact(eval_path)
    
    
    # use eval_df to store a new metric
    eval_log_loss = log_loss(eval_df['label'],eval_df['predict_proba'])
    mlflow.log_metric('log_loss', eval_log_loss)
    
    
    # save a new figure alongside our registered model
    eval_confusion = confusion_matrix(m.predict(X_valid.values), y_valid.to_numpy())
    disp = ConfusionMatrixDisplay(confusion_matrix=eval_confusion,
                              display_labels=['discharged','remained_after_48hrs'])
    
    confusion_path = tmp_path / 'confusion_fig_2.png'
    disp.plot(cmap=plt.cm.Blues).figure_.savefig(confusion_path)
    mlflow.log_artifact(confusion_path)