Introduction

This post describes how to use Cloud Dataflow job templates to easily launch Dataflow pipelines from a Google App Engine (GAE) app, in order to support MapReduce jobs and many other data processing and analysis tasks.

This post builds on a previous post, which used a GAE Flexible service to periodically launch a Python Dataflow pipeline. The use of GAE Flex was necessary at the time, because we needed to install the gcloud sdk in the instance container(s) in order to launch the pipelines.

Since then, Cloud Dataflow templates have come into the picture for the Python SDK. Dataflow templates allow you to stage your pipelines on Google Cloud Storage and execute them from a variety of environments. This has a number of benefits:

  • With templates, you don’t have to recompile your code every time you execute a pipeline.
  • This means that you don’t need to launch your pipeline from a development environment or worry about dependencies.
  • It’s much easier for non-technical users to launch pipelines using templates. You can launch via the Google Cloud Platform Console, the gcloud command-line interface, or the REST API.

In this post, we’ll show how to use the Dataflow job template REST API to periodically launch a Dataflow templated job from GAE. Because we’re now simply calling an API, and no longer relying on the gcloud sdk to launch from App Engine, we can build a simpler App Engine Standard app.

With templates, you can use runtime parameters to customize the execution. We’ll use that feature in this example too.

The pipeline used in this example is nearly the same as that described in the earlier post; it analyzes data stored in Cloud Datastore — in this case, stored tweets fetched periodically from Twitter. The pipeline does several sorts of analysis on the tweet data; for example, it identifies important word co-occurrences in the tweets, based on a variant of the tf*idf metric.

Detecting important word co-occurrences in tweets

Defining a parameterized Dataflow pipeline and creating a template

The first step in building our app is creating a Dataflow template. We do this by building a pipeline and then deploying it with the --template_location flag, which causes the template to be compiled and stored at the given Google Cloud Storage (GCS) location.

You can see the pipeline definition here. It reads recent tweets from the past N days from Cloud Datastore, then splits into three processing branches. It finds the most popular words in terms of the percentage of tweets they were found in, calculates the most popular URLs in terms of their count, and then derives relevant word co-occurrences using an approximation to a tf*idf ranking metric. It writes the results to three BigQuery tables. (It would be equally straightforward to write results to Datastore instead/as well).

The dataflow pipeline graph.

The previous post in this series goes into a bit more detail about what some of the pipeline steps do, and how the pipeline accesses the Datastore.

As part of our new template-ready pipeline definition, we’ll specify that the pipeline takes a runtime argument, named timestamp. This value is used to filter out tweets N days older than the timestamp, so that the pipeline analyzes only recent activity.

class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--timestamp', type=str)

Then, that argument can be accessed at runtime from a template-generated pipeline, as in this snippet:

  user_options = pipeline_options.view_as(UserOptions)
  ...
  wc_records = top_percents | 'format' >> beam.FlatMap(
      lambda x: [{'word': xx[0], 'percent': xx[1], 
                  'ts': user_options.timestamp.get()} for xx in x])

The example includes a template creation utility script called create_template.py, which sets some pipeline options, including the --template_location flag, defines the pipeline (via pipe.process_datastore_tweets()), and calls run() on it. The core of this script is shown below. Note that the pipeline_options dict doesn’t include timestamp; we’ll define that at runtime, not compile time.

import dfpipe.pipe as pipe
...
pipeline_options = {
    'project': PROJECT,
    'staging_location': 'gs://' + BUCKET + '/staging',
    'runner': 'DataflowRunner',
    'setup_file': './setup.py',
    'job_name': PROJECT + '-twcount',
    'temp_location': 'gs://' + BUCKET + '/temp',
    'template_location': 'gs://' + BUCKET + '/templates/' + PROJECT + '-twproc_tmpl'
}
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
pipe.process_datastore_tweets(PROJECT, DATASET, pipeline_options)

Because we used the --template_location flag, a template for that pipeline is compiled and saved to the indicated GCS location (rather than triggering a run of the pipeline).

Now that the template is created, we can use it to launch Dataflow pipeline jobs from our GAE app.

A note on input sources and template runtime arguments

As you can see from this table in the documentation, the Dataflow Python SDK does not yet support the use of runtime parameters with Datastore input.

For pipeline analysis, we want to consider only Datastore data from the last N days. But, because of the above constraint, we can’t access the runtime timestamp parameter when we’re constructing the Datastore reader query. (If you try, you will see a compile-time error). Similarly, if you try the approach taken by the non-template version of the pipeline here, which uses datetime.datetime.now() to construct its Datastore query, you’ll find that you’re always using the same compile-time static timestamp each time you run the template.

To work around this for the template version of this pipeline, we will include a filter step, that can access runtime parameters, and which filters out all but the last N days of tweets post-query. You can see this step as FilterDate in the Dataflow pipeline graph figure above.

Launching a Dataflow templated job from the Cloud Console

Before we actually deploy the GAE app, let’s check that we can launch a properly running Dataflow templated job from our newly generated template. We can do that by launching a job based on that template from Cloud Console. (You could also do this via the gcloud command-line tool). Note that the pipeline won’t do anything interesting unless you already have Tweet data in the Datastore— which would be the case if you tried the earlier example in this series— but you can still confirm that it launches and runs successfully.

Go to the Dataflow pane of Cloud Console, and click “Create Job From Template”.

Creating a Dataflow job from a template.

Select “Custom Template”, then browse to your new template’s location in GCS. This info was output when you ran create_template.py. (The pulldown menu also includes some predefined templates that you may want to explore later).

Select "Custom Template", and specify the path to the template file.

Finally, set your pipeline’s defined runtime parameter(s). In this case, we have one: timestamp. The pipeline is expecting a value in a format like this:
2017-10-22 10:18:13.491543 (you can generate such a string in the Python interpreter via str(datetime.datetime.now())).

Set your pipeline's runtime parameter(s) before running the job.

While we don’t show it here, you can extend your templates with additional metadata so that custom parameters may be validated when the template is executed.

Once you click ‘Run Job’, you should be able to see your job running in Cloud Console.

Using an App Engine app to periodically launch Dataflow jobs (and fetch Tweets)

Now that we’ve checked that we can successfully launch a Dataflow job using our template, we’ll define an App Engine app handler to launch such jobs via the Dataflow job template REST API, and run that handler periodically via a GAE cron. We’ll use another handler of the same app to periodically fetch tweets and store them in the Datastore.

You can see the GAE app script here.
The FetchTweets handler fetches tweets and stores them in the Datastore. See the previous post in this series for a bit more info on that. However, this part of the app is just for example purposes; in your own apps, you probably already have some other means of collecting and storing data in Datastore.

The LaunchJob handler is the new piece of the puzzle: using the Dataflow REST API, it sets the timestamp runtime parameter, and launches a Dataflow job using the template.

from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
...
    credentials = GoogleCredentials.get_application_default()
    service = build('dataflow', 'v1b3', credentials=credentials)

    BODY = {
            "jobName": "{jobname}".format(jobname=JOBNAME),
            "gcsPath": "gs://{bucket}/templates/{template}".format(
                bucket=BUCKET, template=TEMPLATE),
            "parameters": {"timestamp": str(datetime.datetime.utcnow())},
             "environment": {
                "tempLocation": "gs://{bucket}/temp".format(bucket=BUCKET),
                "zone": "us-central1-f"
             }
        }

    dfrequest = service.projects().templates().create(
        projectId=PROJECT, body=BODY)
    dfresponse = dfrequest.execute()
    logging.info(dfresponse)
    self.response.write('Done')

Launching the Dataflow pipeline periodically using a GAE cron

For our GAE app, we want to launch a Dataflow templated job every few hours, where each job analyzes the tweets from the past few days, providing a ‘moving window’ of analysis. So, it makes sense to set things using a cron.yaml file like this:

cron:
- description: fetch tweets
  url: /timeline
  schedule: every 17 minutes
  target: default
- description: launch dataflow pipeline
  url: /launchtemplatejob
  schedule: every 5 hours
  target: default

A GAE app makes it easy to run such a cron, but note that now that we’re using templates, it becomes easier to to support this functionality in other ways too. E.g., it would also be straightforward to use the gcloud CLI to launch the template job, and set up a local cron job.

A look at the example results in BigQuery

Once our example app is up and running, it periodically runs a Dataflow job that writes the results of its analysis to BigQuery. (It would also be straightforward to write results to the Datastore if that makes more sense for your workflow – or to write to multiple sources).

With BigQuery, it is easy to run some fun queries on the data. For example, we can find recent word co-occurrences that are ‘interesting’ by our metric:

"Interesting" word co-occurrences

Or we can look for emergent word pairs, that have become ‘interesting’ in the last day or so (compare April and Oct 2017 results):

Emergent (new) interesting word co-occurrences can reflect current news

We can contrast the ‘interesting’ word pairs with the words that are simply the most popular within a given period (you can see that most of these words are common, but not particularly newsworthy):

Popular, but not necessarily interesting words

Or, find the most frequently tweeted URLs from the past few weeks (some URLs are truncated in the output):

The most frequently tweeted URLs from the past few weeks (filtering out some of the shortlinks)

Summary… and what’s next?

In this post, we’ve looked at how you can programmatically launch Dataflow pipelines — that read from Datastore — using Cloud Dataflow job templates, and call the Dataflow REST API from an App Engine app. See the example app’s README for more detail on how to configure and run the app yourself.

Dataflow’s expressive programming model makes it easy to build and support a wide range of scalable processing and analytics tasks. With templates, it becomes much easier to launch pipeline jobs — you don’t have to recompile every time you execute, or worry about your environment and dependencies. And it’s more straightforward for less technical users to launch template-based pipelines.

We hope you find the example app useful as a starting point towards defining new pipeline templates and running your own analytics — via App Engine apps or otherwise. We look forward to hearing more about what you build!