小编lau*_*rds的帖子

如何在Python中创建从Pub / Sub到GCS的数据流管道

我想使用Dataflow将数据从发布/订阅移到GCS。因此,基本上我希望Dataflow在固定的时间量(例如15分钟)中累积一些消息,然后在经过该时间量后将这些数据作为文本文件写入GCS。

我的最终目标是创建一个自定义管道,因此“ Pub / Sub to Cloud Storage”模板对我来说还不够,而且我完全不了解Java,这使我开始使用Python进行调整。

这是到目前为止我所获得的(Apache Beam Python SDK 2.10.0):

import apache_beam as beam

TOPIC_PATH="projects/<my-project>/topics/<my-topic>"

def CombineFn(e):
    return "\n".join(e)

o = beam.options.pipeline_options.PipelineOptions()
p = beam.Pipeline(options=o)
data = ( p | "Read From Pub/Sub" >> beam.io.ReadFromPubSub(topic=TOPIC_PATH)
       | "Window" >> beam.WindowInto(beam.window.FixedWindows(30))
       | "Combine" >> beam.transforms.core.CombineGlobally(CombineFn).without_defaults()
       | "Output" >> beam.io.WriteToText("<GCS path or local path>"))

res = p.run()
res.wait_until_finish()
Run Code Online (Sandbox Code Playgroud)

我在本地环境中运行该程序没有问题。

python main.py
Run Code Online (Sandbox Code Playgroud)

它在本地运行,但可以从指定的Pub / Sub主题读取,并且每隔30秒就会按预期写入指定的GCS路径。

但是现在的问题是,当我在Google Cloud Platform(即Cloud Dataflow)上运行它时,它不断发出神秘的异常。

java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1096: Traceback (most …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-pubsub google-cloud-dataflow apache-beam

7
推荐指数
1
解决办法
1157
查看次数