从云功能启动云数据流

Sam*_*ety 5 google-cloud-dataflow google-cloud-functions

如何从Google Cloud功能启动Cloud Dataflow作业?我想使用Google Cloud Functions作为启用跨服务合成的机制.

Sam*_*ety 9

我在下面列出了一个非常基本的WordCount示例示例.请注意,您需要在Cloud Function部署中包含Java二进制文件的副本,因为它不在默认环境中.同样,您还需要使用Cloud Function打包部署jar.

module.exports = {
  wordcount: function (context, data) {
    const spawn = require('child_process').spawn;
    const child = spawn(
            'jre1.8.0_73/bin/java',
            ['-cp',
             'MY_JAR.jar',
             'com.google.cloud.dataflow.examples.WordCount',
             '--jobName=fromACloudFunction',
             '--project=MY_PROJECT',
             '--runner=BlockingDataflowPipelineRunner',
             '--stagingLocation=gs://STAGING_LOCATION',
             '--inputFile=gs://dataflow-samples/shakespeare/*',
             '--output=gs://OUTPUT_LOCATION'
            ],
            { cwd: __dirname });

    child.stdout.on('data', function(data) {
      console.log('stdout: ' + data);
    });
    child.stderr.on('data', function(data) {
      console.log('error: ' + data);
    });
    child.on('close', function(code) {
      console.log('closing code: ' + code);
    });
    context.success();
  }
}
Run Code Online (Sandbox Code Playgroud)

您可以通过使用非阻塞运行程序并使该函数返回作业ID 来进一步增强此示例,以便您可以单独轮询作业完成.此模式也应该对其他SDK有效,只要它们的依赖关系可以打包到云功能中.