Aad*_*day 1 google-bigquery google-cloud-platform google-cloud-pubsub google-cloud-dataflow apache-beam
我有一个程序,它在 pubSub 中创建一个主题,并向该主题发布消息。我还有一个自动数据流作业(使用模板),它将这些消息保存到我的 BigQuery 表中。现在我打算用 python 管道替换基于模板的作业,其中我的要求是从 PubSub 读取数据,应用转换并将数据保存到 BigQuery/发布到另一个 PubSub 主题。我开始用 python 编写脚本,并进行了大量的试验和错误来实现它,但令我沮丧的是,我无法实现它。代码如下所示:
import apache_beam as beam
from apache_beam.io import WriteToText
TOPIC_PATH = "projects/test-pipeline-253103/topics/test-pipeline-topic"
OUTPUT_PATH = "projects/test-pipeline-253103/topics/topic-repub"
def run():
o = beam.options.pipeline_options.PipelineOptions()
p = beam.Pipeline(options=o)
print("I reached here")
# # Read from PubSub into a PCollection.
data = (
p
| "Read From Pub/Sub" >> beam.io.ReadFromPubSub(topic=TOPIC_PATH)
)
data | beam.io.WriteToPubSub(topic=OUTPUT_PATH)
print("Lines: ", data)
run()
Run Code Online (Sandbox Code Playgroud)
如果我能尽早获得一些帮助,我将非常感激。注意:我在谷歌云上设置了我的项目,并且我的脚本在本地运行。
这是工作代码。
import apache_beam as beam
TOPIC_PATH = "projects/test-pipeline-253103/topics/test-pipeline-topic"
OUTPUT_PATH = "projects/test-pipeline-253103/topics/topic-repub"
class PrintValue(beam.DoFn):
def process(self, element):
print(element)
return [element]
def run():
o = beam.options.pipeline_options.PipelineOptions()
# Replace this by --stream execution param
standard_options = o.view_as(beam.options.pipeline_options.StandardOptions)
standard_options.streaming = True
p = beam.Pipeline(options=o)
print("I reached here")
# # Read from PubSub into a PCollection.
data = p | beam.io.ReadFromPubSub(topic=TOPIC_PATH) | beam.ParDo(PrintValue()) | beam.io.WriteToPubSub(topic=OUTPUT_PATH)
# Don't forget to run the pipeline!
result = p.run()
result.wait_until_finish()
run()
Run Code Online (Sandbox Code Playgroud)
总之
--streaming
,或者以编程方式执行,如我的代码所示请注意,流模式意味着在 PubSub 上无限期地收听。如果您在 Dataflow 上运行此命令,您的管道将始终处于运行状态,直到您停止它。如果您的消息很少,这可能会花费昂贵的费用。确保这是目标模型
另一种方法是在有限的时间内使用管道(您使用调度程序来启动它,并使用另一个调度程序来停止它)。但是,此时,你必须堆叠消息。这里您使用 aTopic
作为管道的入口。此选项强制 Beam 创建临时订阅并侦听此订阅上的消息。这意味着在创建此订阅之前发布的消息将不会被接收和处理。
这个想法是创建一个订阅,通过这种方式,消息将堆叠在其中(默认情况下最多 7 天)。然后,在管道条目中使用订阅名称beam.io.ReadFromPubSub(subscription=SUB_PATH)
。消息将由 Beam 拆栈并处理(不保证顺序!)
归档时间: |
|
查看次数: |
4403 次 |
最近记录: |