我用python sdk编写自定义接收器。我尝试将数据存储到AWS S3。要连接S3,需要一些凭据,秘密密钥,但是出于安全原因,在代码中进行设置不是很好。我想使环境变量作为环境变量到达Dataflow工作人员。我该怎么做?
我是 Beam 新手,正在努力寻找许多好的指南和资源来学习最佳实践。
我注意到的一件事是有两种定义管道的方法:
with beam.Pipeline() as p:
# pipeline code in here
Run Code Online (Sandbox Code Playgroud)
或者
p = beam.Pipeline()
# pipeline code in here
result = p.run()
result.wait_until_finish()
Run Code Online (Sandbox Code Playgroud)
是否存在首选每种方法的特定情况?
Beam 的强大功能来自于它先进的窗口功能,但它也有点令人困惑。
在本地测试中看到一些奇怪的地方(我使用rabbitmq作为输入源),其中消息并不总是被发送ack,并且固定的窗口并不总是关闭,我开始挖掘StackOverflow和Beam代码库。
似乎对于何时设置确切的水印存在特定于源的问题:
(和别的)。此外,似乎存在与 相对的Checkpoints ( s) 的独立概念。CheckpointMarkWatermarks
所以我认为这是一个由多部分组成的问题:
ack删除。编辑:我使用 beam.io.WriteToBigQuery 并打开了接收器实验选项来使其工作。我实际上已经打开了它,但我的问题是我试图从包含在 str() 中的两个变量(数据集+表)“构建”完整的表引用。这是将整个值提供程序参数数据作为字符串,而不是调用 get() 方法来仅获取值。
我正在尝试生成一个数据流模板,然后从 GCP 云功能进行调用。(作为参考,我的数据流作业应该读取一个包含一堆文件名的文件,然后从 GCS 读取所有这些文件并将其写入 BQ )。因此,我需要以这种方式编写它,以便我可以使用运行时值提供程序来传递 BigQuery 数据集/表。
我的帖子底部是我目前的代码,省略了一些与问题无关的内容。特别注意 BQ_flexible_writer(beam.DoFn) - 这就是我尝试“自定义”beam.io.WriteToBigQuery 的地方,以便它接受运行时值提供程序。
我的模板生成得很好,当我在不提供运行时变量(依赖于默认值)的情况下测试运行管道时,它会成功,并且在控制台中查看作业时我会看到添加的行。但是,在检查 BigQuery 时没有数据(三次检查日志中的数据集/表名称是否正确)。不确定它去了哪里或者我可以添加什么日志记录来了解元素发生了什么?
你知道这里发生了什么吗?或者关于如何使用运行时变量写入 BigQuery 的建议?我什至可以按照将 beam.io.WriteToBigQuery 包含在 DoFn 中的方式来调用它,还是必须采用 beam.io.WriteToBigQuery 背后的实际代码并使用它?
#=========================================================
class BQ_flexible_writer(beam.DoFn):
def __init__(self, dataset, table):
self.dataset = dataset
self.table = table
def process(self, element):
dataset_res = self.dataset.get()
table_res = self.table.get()
logging.info('Writing to table: {}.{}'.format(dataset_res,table_res))
beam.io.WriteToBigQuery(
#dataset= runtime_options.dataset,
table = str(dataset_res) + '.' + str(table_res),
schema = SCHEMA_ADFImpression,
project = str(PROJECT_ID), #options.display_data()['project'],
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED, …Run Code Online (Sandbox Code Playgroud) python runtime google-cloud-platform google-cloud-dataflow apache-beam
场景 - 使用共享 VPC 在项目 A 上运行 Dataflow 作业以使用宿主项目 B 的区域和子网
在服务帐户上,我对项目 A 和 B 都有以下权限
Compute Admin
Compute Network User
Dataflow Admin
Cloud Dataflow Service Agent
Editor
Storage Admin
Serverless VPC Access Admin
Run Code Online (Sandbox Code Playgroud)
但我仍然收到这个错误
Workflow failed. Causes: Error: Message: Required 'compute.subnetworks.get' permission for 'projects/<host project>/regions/us-east1/subnetworks/<subnetwork name>' HTTP Code: 403
Run Code Online (Sandbox Code Playgroud)
我在这里缺少什么?或者这应该有什么其他许可?感谢您查看这个。
dataflow google-cloud-platform google-cloud-dataflow google-vpc
使用流式插入和 Python SDK 2.23 写入 BigQuery 时,我遇到了意外的性能问题。
如果没有写入步骤,管道将在一个工作线程上运行,CPU 约为 20-30%。添加 BigQuery 步骤后,管道可扩展到 6 个工作线程,所有工作线程的 CPU 利用率为 70-90%。
我对 Dataflow 和 Beam 还很陌生,可能这种行为是正常的,或者我做错了什么,但在我看来,使用 6 台机器每秒向 BigQuery 写入 250 行有点繁重。我想知道如何才能达到每秒 100K 行的插入配额。
我的管道如下所示:
p
| "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=options.pubsub_subscription) # ~40/s
| "Split messages" >> beam.FlatMap(split_messages) # ~ 400/s
| "Prepare message for BigQuery" >> beam.Map(prepare_row)
| "Filter known message types" >> beam.Filter(filter_message_types) # ~ 250/s
| "Write to BigQuery" >> beam.io.WriteToBigQuery(
table=options.table_spec_position,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
additional_bq_parameters=additional_bq_parameters,
)
Run Code Online (Sandbox Code Playgroud)
尽管我在不使用流引擎的情况下经历了类似的行为,但管道使用这些选项运行。
--enable_streaming_engine \ …Run Code Online (Sandbox Code Playgroud) 由于批处理模式下的数据流作业,我想将消息发布到具有某些属性的 Pub/Sub 主题。
我的数据流管道是用 python 3.8 和 apache-beam 2.27.0 编写的
它与@Ankur解决方案一起使用:/sf/answers/3907700121/
但我认为使用共享的 Pub/Sub 客户端可能会更有效:/sf/answers/3908379821/
然而发生了错误:
return StockUnpickler.find_class(self, module, name) AttributeError: 无法从 '/usr/local/lib/python3.8/site-packages/dataflow_worker/start 获取 <module 'dataflow_worker.start' 上的属性 'PublishFn'。 py'>
问题:
我的数据流管道:
import apache_beam as beam
from apache_beam.io.gcp import bigquery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud.pubsub_v1 import PublisherClient
import json
import argparse
import re
import logging
class PubsubClient(PublisherClient):
def __reduce__(self):
return self.__class__, (self.batch_settings,)
# The DoFn to perform on each element in the input …Run Code Online (Sandbox Code Playgroud) python-3.x google-cloud-platform google-cloud-pubsub google-cloud-dataflow apache-beam
我的一项 Dataflow 作业遇到一些问题。我有时会收到此错误消息。似乎在这个错误之后,作业一直运行良好,但是,今天晚上它实际上卡住了,或者它开始缓慢地处理元素。您还可以从屏幕截图中看到,工作人员开始表现得非常奇怪,如下面屏幕截图中的 CPU 使用率图表所示。
Error message from worker:
generic::aborted: SDK harness sdk-0-1 disconnected.
generic::aborted: SDK harness sdk-0-1 disconnected.
generic::aborted: SDK harness sdk-0-1 disconnected.
generic::aborted: SDK harness sdk-0-1 disconnected.
passed through: ==> dist_proc/dax/workflow/worker/fnapi_service.cc:631 generic::aborted: SDK harness sdk-0-1 disconnected.
generic::aborted: SDK harness sdk-0-1 disconnected.
passed through: ==> dist_proc/dax/workflow/worker/fnapi_service.cc:631 generic::aborted: SDK harness sdk-0-1 disconnected.
passed through: ==> dist_proc/dax/workflow/worker/fnapi_service.cc:631 generic::aborted: SDK harness sdk-0-1 disconnected.
passed through: ==> dist_proc/dax/workflow/worker/fnapi_service.cc:631 generic::aborted: SDK harness sdk-0-1 disconnected.
generic::aborted: SDK harness sdk-0-1 disconnected.
passed through: ==> dist_proc/dax/workflow/worker/fnapi_service.cc:631 generic::aborted: …Run Code Online (Sandbox Code Playgroud) 我正在编写一个 Splittable DoFn 来读取 MongoDB 更改流。它允许我观察描述集合更改的事件,并且只要 oplog 有足够的历史记录,我就可以在我想要的任意集群时间戳处开始读取。集群时间戳是自纪元以来的秒数与给定秒内操作的序列号相结合。
我看过 SDF 的其他示例,但到目前为止我所看到的所有示例都假设有一个“可查找”数据源(Kafka 主题分区、Parquet/Avro 文件等)
MongoDB 公开的接口是一个简单的 Iterable,因此我无法真正seek获得精确的偏移量(除了在时间戳之后开始获取新的 Iterable 之外),并且它生成的事件只有集群时间戳 - 同样,没有与输出关联的精确偏移量元素。
为了配置 SDF,我使用以下类作为输入元素类型:
public static class StreamConfig implements Serializable {
public final String databaseName;
public final String collectionName;
public final Instant startFrom;
...
}
Run Code Online (Sandbox Code Playgroud)
作为限制,我使用的是OffsetRange因为我可以将这些时间戳转换为长值并返回。对于偏移跟踪器,我选择了 GrowableOffsetRangeTracker,因为它可以处理潜在的无限范围。
我在提出范围结束估计器时遇到了问题 - 最后我假设now()将是最大潜在时间戳,因为我们可以读取流的最快速度是实时的。
@GetInitialRestriction
public OffsetRange getInitialRestriction(@Element StreamConfig element) {
final int fromEpochSecond =
(int) (Optional.ofNullable(element.startFrom).orElse(Instant.now()).getMillis() / 1000);
final BsonTimestamp bsonTimestamp = new BsonTimestamp(fromEpochSecond, 0);
return new OffsetRange(bsonTimestamp.getValue(), Long.MAX_VALUE); …Run Code Online (Sandbox Code Playgroud) 我正在遵循本教程,了解如何将数据从 Oracle 数据库迁移到 Cloud SQL PostreSQL 实例。
我正在使用 Google 提供的流模板数据流到 PostgreSQL
从高层次来看,这是预期的:
当 Avro 文件上传到云存储位置时,该作业确实被触发,但当我检查目标 PostgreSQL 数据库时,所需的数据尚未填充。
当我检查作业日志和工作日志时,没有错误日志。当作业被触发时,这些是记录的日志:
StartBundle: 4
Matched 1 files for pattern gs://BUCKETNAME/ora2pg/DEMOAPP_DEMOTABLE/2022/01/11/20/03/7e13ac05aa3921875434e51c0c0c63aaabced31a_oracle-backfill_336860711_1_0.avro
FinishBundle: 5
Run Code Online (Sandbox Code Playgroud)
有谁知道问题是什么?是配置问题吗?如果需要的话我会发布所需的配置。
如果没有,有人可以帮助我如何正确调试这个特定的数据流作业吗?谢谢
编辑1:
在检查管道中步骤的步骤信息时,发现以下内容:
第一步 (DatastreamIO) 似乎按预期工作,“输出集合”中的元素计数器数量正确,为 2。

然而,在第二步中,在输出集合中找不到这两个元素计数器。进一步检查,可以看到元素似乎在以下步骤中被删除(格式化为 Postgres DML > 格式化为 Postgres DML > 映射):

编辑2:
编辑3:
为了调试这个问题,我从源代码中单独构建并部署了模板。我发现代码可以运行到以下行DatabaseMigrationUtils.java:
return KV.of(jsonString, dmlInfo);
Run Code Online (Sandbox Code Playgroud)
其中 …
apache-beam ×8
python ×3
dataflow ×1
google-vpc ×1
java ×1
mongodb ×1
postgresql ×1
python-3.x ×1
runtime ×1