Thi*_*ijs 3 python-3.x google-bigquery google-cloud-dataflow apache-beam
在我的管道中,我使用 WriteToBigQuery 是这样的:
| beam.io.WriteToBigQuery(
'thijs:thijsset.thijstable',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
Run Code Online (Sandbox Code Playgroud)
这将返回文档中描述的 Dict,如下所示:
beam.io.WriteToBigQuery PTransform 返回一个字典,其 BigQueryWriteFn.FAILED_ROWS 条目包含所有未能写入的行的 PCollection。
如何打印此 dict 并将其转换为 pcollection 或如何仅打印 FAILED_ROWS?
如果我做: | "print" >> beam.Map(print)
然后我得到: AttributeError: 'dict' object has no attribute 'pipeline'
我一定读过一百个管道,但在 WriteToBigQuery 之后我再也没有看到任何东西。
[编辑] 当我完成管道并将结果存储在一个变量中时,我有以下内容:
{'FailedRows': <PCollection[WriteToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).FailedRows] at 0x7f0e0cdcfed0>}
Run Code Online (Sandbox Code Playgroud)
但我不知道如何在管道中使用这个结果,如下所示:
| beam.io.WriteToBigQuery(
'thijs:thijsset.thijstable',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
| ['FailedRows'] from previous step
| "print" >> beam.Map(print)
Run Code Online (Sandbox Code Playgroud)
Gui*_*ins 11
处理无效输入的死信是一种常见的 Beam/Dataflow 用法,适用于 Java 和 Python SDK,但后者的示例并不多。
想象一下,我们有一些虚拟输入数据,其中包含 10 行好行和不符合表模式的坏行:
schema = "index:INTEGER,event:STRING"
data = ['{0},good_line_{1}'.format(i + 1, i + 1) for i in range(10)]
data.append('this is a bad row')
Run Code Online (Sandbox Code Playgroud)
然后,我要做的是命名写入结果(events在这种情况下):
events = (p
| "Create data" >> beam.Create(data)
| "CSV to dict" >> beam.ParDo(CsvToDictFn())
| "Write to BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
"{0}:dataflow_test.good_lines".format(PROJECT),
schema=schema,
)
)
Run Code Online (Sandbox Code Playgroud)
然后访问FAILED_ROWS侧面输出:
(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
| "Bad lines" >> beam.io.textio.WriteToText("error_log.txt"))
Run Code Online (Sandbox Code Playgroud)
这适用DirectRunner于 BigQuery 并将好的行写入 BigQuery:
和坏的本地文件:
$ cat error_log.txt-00000-of-00001
('PROJECT_ID:dataflow_test.good_lines', {'index': 'this is a bad row'})
Run Code Online (Sandbox Code Playgroud)
如果您使用 运行它,DataflowRunner您将需要一些额外的标志。如果您遇到TypeError: 'PDone' object has no attribute '__getitem__'错误,则需要添加--experiments=use_beam_bq_sink以使用新的 BigQuery 接收器。
如果你得到KeyError: 'FailedRows'它,那是因为新接收器将默认为批处理管道加载 BigQuery 作业:
STREAMING_INSERTS、FILE_LOADS 或 DEFAULT。关于将数据加载到 BigQuery 的介绍:https : //cloud.google.com/bigquery/docs/loading-data。DEFAULT 将在流管道上使用 STREAMING_INSERTS,在批处理管道上使用 FILE_LOADS。
您可以通过指定method='STREAMING_INSERTS'in来覆盖行为WriteToBigQuery:
DirectRunner和DataflowRunner 这里的完整代码。
| 归档时间: |
|
| 查看次数: |
1915 次 |
| 最近记录: |