Triggering Dataflow Pipelines With Cloud Functions
Love cronjobs? No? Oh. Good! You can combine Google Cloud Functions with a Dataflow Pipeline to make them a thing of the past, not to mention their VMs.
Join the DZone community and get the full member experience.
Join For FreeDo you have an unreasonable fear of cronjobs? Find spinning up VMs to be a colossal waste of your towering intellect? Does the thought of checking a folder regularly for updates fill you with an apoplectic rage? If so, you should probably get some help. Maybe find another line of work.
In the meantime, here’s one way to ease your regular file processing anxieties. With just one application of Google Cloud Functions, eased gently up your Dataflow Pipeline, you can find lasting relief from troublesome cronjobs.
But first, some assumptions.
Assumptions?
You’ve got gcloud installed, you’ve created a project, and you’ve enabled the dataflow and cloud functions APIs. If you haven’t, then a few minutes of reading the Google docs should get you started.
I’m Ready Now
We are going to set up a Google Cloud Function that will get called every time a cloud storage bucket gets updated. That function will kick off a dataflow pipeline (using the handy new templates) to process the file with whatever complicated data transformations somebody further up the food chain has specified.
Hang on, You’ve Lost Me
OK, maybe I should have been a bit more explicit in my assumptions. Here’s a quick cheat sheet:
- Cloud functions: serverless little bits of code (like Amazon’s Lambdas)
- Dataflow: a managed service for processing large amounts of data
- Cloud storage: Google’s equivalent of S3
OK, Carry on, We’re Good
The first step is to create a few buckets (you can call them whatever you want, but these are the names I’ve used in the rest of this article):
- One for keeping the code for your cloud function: cloud-fn-staging
- One for keeping the code for your pipeline: pipeline-staging
- The bucket you want to monitor for updates: incoming-files
If you’re not sure how to create buckets, Google’s docs can help. Or just start clicking around randomly in the cloud console. You’ll work it out. I believe in you, champ.
Now What?
Now we’re going to make a Dataflow Template. If you’ve already got a pipeline, you can use that one, but I mangled one of the example pipelines that Google gives you. There’s a little fiddly detail here: Templates were introduced in version 1.9.0 of the Dataflow Java libraries, so you’ll need at least that version. However, if you go for the latest 2.x version (which is also the default version that the maven archetypes generate), the way to create your template changes. But the Google docs haven’t been updated for that version.
To create the example pipeline, I ran the following maven command:
mvn archetype:generate \
-DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
-DarchetypeGroupId=com.google.cloud.dataflow \
-DarchetypeVersion=1.9.0 \
-DgroupId=com.shinesolutions \
-DartifactId=dataflow-template-poc \
-Dversion="0.1" \
-DinteractiveMode=false \
-Dpackage=com.shinesolutions
Notice the -DarchetypeVersion=1.9.0
option, which ensures that I’m using that version of the libs. Without that option, you’ll end up with the 2.x versions (which I will explain how to use as well, don’t worry).
This generates some sample code, including the Dataflow standard WordCount. I edited this to make it templatable. I also invented the word templatable, just now. You can use it, though, it’s cool. Here’s the important changes, first in the WordCountOptions interface (in the WordCount class):
public interface WordCountOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Validation.Required
ValueProvider < String > getInputFile();
void setInputFile(ValueProvider < String > value);
@Description("Path of the file to write to")
@Validation.Required
ValueProvider < String > getOutputFile();
void setOutputFile(ValueProvider < String > value);
}
Instead of the options just being String values, they are now ValueProvider
types. This lets the Runner know that these values will be provided later.
The main
method looks like this:
public static void main(String[] args) {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(WordCountOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()).withoutValidation())
.apply(new CountWords())
.apply(ParDo.of(new FormatAsTextFn()))
.apply(TextIO.Write.named("WriteCounts").to(options.getOutputFile()).withoutValidation());
p.run();
}
The important thing to note here is the .withoutValidation()
modifier to the TextIO.Read
. If you don’t do that, your template won’t get created because TextIO will try to validate the option values before they’ve been supplied. We’re going to use TextIO in this example, but PubSub and BigQuery input/output in dataflow also support passing in options from templates.
To create our template, run this command:
mvn compile exec:java \
-Dexec.mainClass=com.shinesolutions.WordCount \
-Dexec.args="--project=<your project> \
--stagingLocation=gs://pipeline-staging \
--dataflowJobFile=gs://pipeline-staging/templates/WordCount \
--runner=TemplatingDataflowPipelineRunner"
If that succeeded, your template has been created. Remember the value you supplied for the dataflowJobFile
option, you’ll need that in your cloud function.
If you chose to use the 2.x version of the libraries, you probably got an error. I did tell you not to, but you knew better, didn’t you? The command for 2.x looks like this:
mvn compile exec:java \
-Dexec.mainClass=com.shinesolutions.WordCount \
-Dexec.args="--project=<your project> \
--stagingLocation=gs://pipeline-staging \
--templateLocation=gs://pipeline-staging/templates/WordCount \
--runner=DataflowRunner"
I haven’t tested this with version 2.x, so don’t blame me if that command deletes the Internet or something.
Template's Done. What Next?
Now you need your cloud function. They are written in JavaScript running on Node.js, so you should probably install that. Then, in a suitable directory, run npm init
to set up a package.json
file, which will tell Google what your dependencies are. It will ask you a lot of questions, but don’t stress about the answers, they’re not a big deal.
Our cloud function is going to talk to the Dataflow API, so you’ll need to install that dependency. Run npm install --save googleapis
to get that done. (Confusingly there are two versions of Node.js support from Google, the @google-cloud packages don’t support Dataflow yet though). The cloud function looks like this:
const google = require('googleapis');
exports.goWithTheDataFlow = function(event, callback) {
const file = event.data;
if (file.resourceState === 'exists' && file.name) {
google.auth.getApplicationDefault(function(err, authClient, projectId) {
if (err) {
throw err;
}
if (authClient.createScopedRequired && authClient.createScopedRequired()) {
authClient = authClient.createScoped([
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/userinfo.email'
]);
}
const dataflow = google.dataflow({
version: 'v1b3',
auth: authClient
});
dataflow.projects.templates.create({
projectId: projectId,
resource: {
parameters: {
inputFile: `gs://${file.bucket}/${file.name}`,
outputFile: `gs://${file.bucket}/${file.name}-wordcount`
},
jobName: 'cloud-fn-dataflow-test',
gcsPath: 'gs://pipeline-staging/templates/WordCount'
}
}, function(err, response) {
if (err) {
console.error("problem running dataflow template, error was: ", err);
}
console.log("Dataflow template response: ", response);
callback();
});
});
}
};
We’re going to trigger our cloud function from files being uploaded to a GCS bucket, so goWithTheDataFlow
gets called with an event that has a few useful properties. The main one is event.data
, which contains the information about the updated resource. We check if the resource exists (because we also get notifications of deletes from the bucket). Then we authenticate – because we’re a cloud function, default application auth is all set up for you – and create a Dataflow API client. Make the call to our dataflow template and we are done. Easy.
Now we upload our function to Google’s cloud with a command that looks like this:
gcloud beta functions deploy goWithTheDataFlow \
--stage-bucket cloud-fn-staging \
--trigger-bucket incoming-files
If that all went OK, we should be good to test with a file.
Awesome. How Do I Know It’s Working?
The cloud function logs go to Stackdriver Logging in your Google Cloud console. Upload a file to the bucket, and in a few seconds, you should be able to see some output there. Any errors will appear here, and also trigger an email to you as well (by default anyway). You can see your Dataflow pipeline in the usual Dataflow area of the cloud console, and you can see the output files (if you used the WordCount code from above) in the same bucket as your source file.
Great. What Should I Do Now?
You’ll probably want to get rid of that test cloud function at some point. Do that with a command like this:
gcloud beta functions delete goWithTheDataFlow
Then make a cup of tea. Put your feet up, close your eyes, let your mind relax and just think about all those cronjobs you can delete. Imagine grinding them under your heel, and setting fire to the VMs they run on. Better now? Good.
Published at DZone with permission of Gareth Jones, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments