Introduction

With ML workflows, it is often insufficient to train and deploy a given model just once. Even if the model has desired accuracy initially, this can change if the data used for making prediction requests becomes— perhaps over time— sufficiently different from the data used to originally train the model.

When new data becomes available, which could be used for retraining a model, it can be helpful to apply techniques for analyzing data ‘drift’, and determining whether the drift is sufficiently anomalous to warrant retraining yet. It can also be useful to trigger such an analysis— and potential re-run of your training pipeline— automatically, upon arrival of new data.

This blog post highlights an example notebook that shows how to set up such a scenario with Kubeflow Pipelines (KFP). It shows how to build a pipeline that checks for statistical drift across successive versions of a dataset and uses that information to make a decision on whether to (re)train a model1; and how to configure event-driven deployment of pipeline jobs when new data arrives.

The notebook builds on an example highlighted in a previous blog post — which shows a KFP training and serving pipeline— and introduces two primary new concepts:

  • the example demonstrates use of the TensorFlow Data Validation (TFDV) library to build pipeline components that derive dataset statistics and detect drift between older and newer dataset versions, and shows how to use drift information to decide whether to retrain a model on newer data.
  • the example shows how to support event-triggered launch of Kubeflow Pipelines runs from a Cloud Functions (GCF) function, where the Function run is triggered by addition of a file to a given Cloud Storage (GCS) bucket.

The machine learning task uses a tabular dataset that joins London bike rental information with weather data, and trains a Keras model to predict rental duration. See this and this blog post and associated README for more background on the dataset and model architecture.


A pipeline run using TFDV-based components to detect 'data drift'.

Running the example notebook

The example notebook requires a Google Cloud Platform (GCP) account and project, ideally with quota for using GPUs, and— as detailed in the notebook— an installation of AI Platform Pipelines (Hosted Kubeflow Pipelines) (that is, an installation of KFP on Google Kubernetes Engine (GKE)), with a few additional configurations once installation is complete.

The notebook can be run using either Colab (open directly) or AI Platform Notebooks (open directly).

Creating TFDV-based KFP components

Our first step is to build the TFDV components that we want to use in our pipeline.

Note: For this example, our training data is in GCS, in CSV-formatted files. So, we can take advantage of TFDV’s ability to process CSV files. The TFDV libraries can also process files in TFRecords format.

We’ll define both TFDV KFP pipeline components as ‘lightweight’ Python-function-based components. For each component, we define a function, then call kfp.components.func_to_container_op() on that function to build a reusable component in .yaml format. Let’s take a closer look at how this works (details are in the notebook).

Below is the Python function we’ll use to generate TFDV statistics from a collection of csv files. The function— and the component we’ll create from it— outputs the path to the generated stats file. When we define a pipeline that uses this component, we’ll use this step’s output as input to another pipeline step. TFDV uses a Beam pipeline— not to be confused with KFP Pipelines— to implement the stats generation. Depending upon configuration, the component can use either the Direct (local) runner or the Dataflow runner. Running the Beam pipeline on Dataflow rather than locally can make sense with large datasets.

from typing import NamedTuple

def generate_tfdv_stats(input_data: str, output_path: str, job_name: str, use_dataflow: str,
                        project_id: str, region:str, gcs_temp_location: str, gcs_staging_location: str,
                        whl_location: str = '', requirements_file: str = 'requirements.txt'
) -> NamedTuple('Outputs', [('stats_path', str)]):

  import logging
  import time

  import tensorflow_data_validation as tfdv
  import tensorflow_data_validation.statistics.stats_impl
  from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions

  logging.getLogger().setLevel(logging.INFO)
  logging.info("output path: %s", output_path)
  logging.info("Building pipeline options")
  # Create and set your PipelineOptions.
  options = PipelineOptions()

  if use_dataflow == 'true':
    logging.info("using Dataflow")
    if not whl_location:
      logging.warning('tfdv whl file required with dataflow runner.')
      exit(1)
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = project_id
    google_cloud_options.job_name = '{}-{}'.format(job_name, str(int(time.time())))
    google_cloud_options.staging_location = gcs_staging_location
    google_cloud_options.temp_location = gcs_temp_location
    google_cloud_options.region = region
    options.view_as(StandardOptions).runner = 'DataflowRunner'

    setup_options = options.view_as(SetupOptions)
    setup_options.extra_packages = [whl_location]
    setup_options.requirements_file = 'requirements.txt'

  tfdv.generate_statistics_from_csv(
    data_location=input_data, output_path=output_path,
    pipeline_options=options)

  return (output_path, )

To turn this function into a KFP component, we’ll call kfp.components.func_to_container_op(). We’re passing it a base container image to use: gcr.io/google-samples/tfdv-tests:v1. This base image has the TFDV libraries already installed, so that we don’t need to install them ‘inline’ when we run a pipeline step based on this component.

import kfp
kfp.components.func_to_container_op(generate_tfdv_stats,
    output_component_file='tfdv_component.yaml', base_image='gcr.io/google-samples/tfdv-tests:v1')

We’ll take the same approach to build a second TFDV-based component, one which detects drift between datasets by comparing their stats. The TFDV library makes this straightforward. We’re using a drift comparator appropriate for a regression model— as used in the example pipeline— and looking for drift on a given set of fields (in this case, for example purposes, just one). The tensorflow_data_validation.validate_statistics() call will then tell us whether the drift anomaly for that field is over the specified threshold. See the TFDV docs for more detail.

  schema1 = tfdv.infer_schema(statistics=stats1)
  tfdv.get_feature(schema1, 'duration').drift_comparator.jensen_shannon_divergence.threshold = 0.01
  drift_anomalies = tfdv.validate_statistics(
      statistics=stats2, schema=schema1, previous_statistics=stats1)

(The details of this second component definition are in the example notebook).

Defining a pipeline that uses the TFDV components

After we’ve defined both TFDV components— one to generate stats for a dataset, and one to detect drift between datasets— we’re ready to build a Kubeflow Pipeline that uses these components, in conjunction with previously-built components for a training & serving workflow.

Instantiate pipeline ops from the components

KFP components in yaml format are shareable and reusable. We’ll build our pipeline by starting with some already-built components— (described in more detail here)— that support our basic ‘train/evaluate/deploy’ workflow.

We’ll instantiate some pipeline ops from these pre-existing components like this, by loading them via URL:

import kfp.components as comp

# pre-existing components
train_op = comp.load_component_from_url(
  'https://raw.githubusercontent.com/amygdala/code-snippets/master/ml/kubeflow-pipelines/keras_tuner/components/train_component.yaml'
  )
... etc. ...

… then create our TFDV ops from the new components we just built:


tfdv_op = comp.load_component_from_file(
  'tfdv_component.yaml'
  )
tfdv_drift_op = comp.load_component_from_file(
  'tfdv_drift_component.yaml'
  )

Then, we define a KFP pipeline from the defined ops. We’re not showing the pipeline in full here— see the notebook for details. Two pipeline steps are based on the tfdv_op, which generates the stats. tfdv1 generates stats for the test data, and tfdv2 for the training data. In the following, you can see that the tfdv_drift step (based on the tfdv_drift_op) takes as input the output from the tfdv2 (stats for training data) step.

@dsl.pipeline(
  name='bikes_weather_tfdv',
  description='Model bike rental duration given weather'
)
def bikes_weather_tfdv(
  ... other pipeline params ...
  working_dir: str = 'gs://YOUR/GCS/PATH',
  data_dir: str = 'gs://aju-dev-demos-codelabs/bikes_weather/',
  project_id: str = 'YOUR-PROJECT-ID',
  region: str = 'us-central1',
  requirements_file: str = 'requirements.txt',
  job_name: str = 'test',
  whl_location: str = 'tensorflow_data_validation-0.26.0-cp37-cp37m-manylinux2010_x86_64.whl',
  use_dataflow: str = '',
  stats_older_path: str = 'gs://aju-dev-demos-codelabs/bikes_weather_chronological/evaltrain1.pb'
  ):
  ...

  tfdv1 = tfdv_op(  # TFDV stats for the test data
    input_data='%stest-*.csv' % (data_dir,),
    output_path='%s/tfdv_expers/%s/eval/evaltest.pb' % (working_dir, dsl.RUN_ID_PLACEHOLDER),
    job_name='%s-1' % (job_name,),
    use_dataflow=use_dataflow,
    project_id=project_id, region=region,
    gcs_temp_location='%s/tfdv_expers/tmp' % (working_dir,),
    gcs_staging_location='%s/tfdv_expers' % (working_dir,),
    whl_location=whl_location, requirements_file=requirements_file
    )
  tfdv2 = tfdv_op(  # TFDV stats for the training data
    input_data='%strain-*.csv' % (data_dir,),
    # output_path='%s/%s/eval/evaltrain.pb' % (output_path, dsl.RUN_ID_PLACEHOLDER),
    output_path='%s/tfdv_expers/%s/eval/evaltrain.pb' % (working_dir, dsl.RUN_ID_PLACEHOLDER),
    job_name='%s-2' % (job_name,),
    use_dataflow=use_dataflow,
    project_id=project_id, region=region,
    gcs_temp_location='%s/tfdv_expers/tmp' % (working_dir,),
    gcs_staging_location='%s/tfdv_expers' % (working_dir,),
    whl_location=whl_location, requirements_file=requirements_file
    )

  # compare generated training data stats with stats from a previous version
  # of the training data set.
  tfdv_drift = tfdv_drift_op(stats_older_path, tfdv2.outputs['stats_path'])

  # proceed with training if drift is detected (or if no previous stats were provided)
  with dsl.Condition(tfdv_drift.outputs['drift'] == 'true'):
    train = train_op(...)
    eval_metrics = eval_metrics_op(...)
    with dsl.Condition(eval_metrics.outputs['deploy'] == 'deploy'):
      serve = serve_op(...)

While not all pipeline details are shown, you can see that this pipeline definition includes some conditional expressions; parts of the pipeline will run only if an output of an ‘upstream’ step meets the given conditions. We start the model training step if drift anomalies were detected. (And, once training is completed, we’ll deploy the model for serving only if its evaluation metrics meet certain thresholds).

Here’s the DAG for this pipeline. You can see the conditional expressions reflected; and can see that the step to generate stats for the test dataset provides no downstream dependencies, but the stats on the training set are used as input for the drift detection step.


The pipeline DAG

Here’s a pipeline run in progress:


A pipeline run in progress.

See the example notebook for more details on how to run this pipeline.

Event-triggered pipeline runs

Once you have defined this pipeline, a next useful step is to automatically run it when an update to the dataset is available, so that each dataset update triggers an analysis of data drift and potential model (re)training.

We’ll show how to do this using Cloud Functions (GCF), by setting up a function that is triggered when new data is added to a GCS bucket.

Set up a GCF function to trigger a pipeline run when a dataset is updated

We’ll define and deploy a Cloud Functions (GCF) function that launches a run of this pipeline when new training data becomes available, as triggered by the creation or modification of a file in a ‘trigger’ bucket on GCS.

In most cases, you don’t want to launch a new pipeline run for every new file added to a dataset— since typically, the dataset will be comprised of a collection of files, to which you will add/update multiple files in a batch. So, you don’t want the ‘trigger bucket’ to be the dataset bucket (if the data lives on GCS)— as that will trigger unwanted pipeline runs. Instead, we’ll trigger a pipeline run after the upload of a batch of new data has completed.

To do this, we’ll use an approach where the ‘trigger’ bucket is different from the bucket used to store dataset files. ‘Trigger files’ uploaded to that bucket are expected to contain the path of the updated dataset as well as the path to the data stats file generated for the last model trained. A trigger file is uploaded once the new data upload has completed, and that upload triggers a run of the GCF function, which in turn reads info on the new data path from the trigger file and launches the pipeline job.

Define the GCF function

To set up this process, we’ll first define the GCF function in a file called main.py, as well as an accompanying requirements file in the same directory that specifies the libraries to load prior to running the function. The requirements file will indicate to install the KFP SDK:

kfp==1.4

The code looks like this (with some detail removed); we parse the trigger file contents and use that information to launch a pipeline run. The code uses the values of several environment variables that we will set when uploading the GCF function.

import logging
import os

import kfp
from kfp import dsl
from kfp import compiler
from kfp import components

from google.cloud import storage

PIPELINE_PROJECT_ID = os.getenv('PIPELINE_PROJECT_ID')
...etc...

def read_trigger_file(data, context, storage_client):
    """Read the contents of the trigger file and return as string.
    """
    ....
    bucket = storage_client.get_bucket(data['bucket'])
    blob = bucket.get_blob(data['name'])
    trigger_file_string = blob.download_as_string().strip()
    logging.info('trigger file contents: {}'.format(trigger_file_string))
    return trigger_file_string.decode('UTF-8')


def gcs_update(data, context):
    """Background Cloud Function to be triggered by Cloud Storage.
    """

    storage_client = storage.Client()
    # get the contents of the trigger file
    trigger_file_string = read_trigger_file(data, context, storage_client)
    trigger_file_info = trigger_file_string.strip().split('\n')
    # then run the pipeline using the given job spec, passing the trigger file contents
    # as parameter values.
    logging.info('running pipeline with id %s...', PIPELINE_ID)
    # create the client object
    client = kfp.Client(host=PIPELINE_HOST)
    # deploy the pipeline run
    run = client.run_pipeline(EXP_ID, 'bw_tfdv_gcf', pipeline_id=PIPELINE_ID,
                          params={'working_dir': WORKING_DIR,
                                  'project_id': PIPELINE_PROJECT_ID,
                                  'use_dataflow': USE_DATAFLOW,
                                  'data_dir': trigger_file_info[0],
                                  'stats_older_path': trigger_file_info[1]})

    logging.info('job response: %s', run)

Then we’ll deploy the GCF function as follows. Note that we’re indicating to use the gcs_update definition (from main.py), and specifying the trigger bucket. Note also how we’re setting environment vars as part of the deployment.

gcloud functions deploy gcs_update --set-env-vars \
  PIPELINE_PROJECT_ID={PROJECT_ID},WORKING_DIR={WORKING_DIR},PIPELINE_SPEC={PIPELINE_SPEC},PIPELINE_ID={PIPELINE_ID},PIPELINE_HOST={PIPELINE_HOST},EXP_ID={EXP_ID},USE_DATAFLOW=true \
  --runtime python37 --trigger-resource {TRIGGER_BUCKET} --trigger-event google.storage.object.finalize

Trigger a pipeline run when new data becomes available

Once the GCF function is set up, it will run when a file is added to (or modified in) the trigger bucket. For this simple example, the GCF function expects trigger files of the following format, where the first line is the path to the updated dataset, and the second line is the path to the TFDV stats for the dataset used for the previously-trained model. More generally, such a trigger file can contain whatever information is necessary to determine how to parameterize the pipeline run.

gs://path/to/new/or/updated/dataset/
gs://path/to/stats/from/previous/dataset/stats.pb

Summary

This blog post showed how to build Kubeflow Pipeline components, using the TFDV libraries, to analyze datasets and detect data drift. Then, it showed how to support event-triggered pipeline runs via Cloud Functions.

The post didn’t include use of TFDV to visualize and explore the generated stats, but this example notebook shows how you can do that. You can also explore the samples in the Kubeflow Pipelines GitHub repo.

  1. In this example, we show full model retraining on a new dataset. An alternate scenario— not covered here— could involve tuning an existing model with new data.