我想使用Dataflow将数据从发布/订阅移到GCS。因此,基本上我希望Dataflow在固定的时间量(例如15分钟)中累积一些消息,然后在经过该时间量后将这些数据作为文本文件写入GCS。
我的最终目标是创建一个自定义管道,因此“ Pub / Sub to Cloud Storage”模板对我来说还不够,而且我完全不了解Java,这使我开始使用Python进行调整。
这是到目前为止我所获得的(Apache Beam Python SDK 2.10.0):
import apache_beam as beam
TOPIC_PATH="projects/<my-project>/topics/<my-topic>"
def CombineFn(e):
return "\n".join(e)
o = beam.options.pipeline_options.PipelineOptions()
p = beam.Pipeline(options=o)
data = ( p | "Read From Pub/Sub" >> beam.io.ReadFromPubSub(topic=TOPIC_PATH)
| "Window" >> beam.WindowInto(beam.window.FixedWindows(30))
| "Combine" >> beam.transforms.core.CombineGlobally(CombineFn).without_defaults()
| "Output" >> beam.io.WriteToText("<GCS path or local path>"))
res = p.run()
res.wait_until_finish()
Run Code Online (Sandbox Code Playgroud)
我在本地环境中运行该程序没有问题。
python main.py
Run Code Online (Sandbox Code Playgroud)
它在本地运行,但可以从指定的Pub / Sub主题读取,并且每隔30秒就会按预期写入指定的GCS路径。
但是现在的问题是,当我在Google Cloud Platform(即Cloud Dataflow)上运行它时,它不断发出神秘的异常。
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1096: Traceback (most …
Run Code Online (Sandbox Code Playgroud) python google-cloud-pubsub google-cloud-dataflow apache-beam