监控 WriteToBigQuery

Thi*_*ijs 3 python-3.x google-bigquery google-cloud-dataflow apache-beam

在我的管道中,我使用 WriteToBigQuery 是这样的:

| beam.io.WriteToBigQuery(
     'thijs:thijsset.thijstable',
      schema=table_schema,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
Run Code Online (Sandbox Code Playgroud)

这将返回文档中描述的 Dict,如下所示:

beam.io.WriteToBigQuery PTransform 返回一个字典,其 BigQueryWriteFn.FAILED_ROWS 条目包含所有未能写入的行的 PCollection。

如何打印此 dict 并将其转换为 pcollection 或如何仅打印 FAILED_ROWS?

如果我做: | "print" >> beam.Map(print)

然后我得到: AttributeError: 'dict' object has no attribute 'pipeline'

我一定读过一百个管道,但在 WriteToBigQuery 之后我再也没有看到任何东西。

[编辑] 当我完成管道并将结果存储在一个变量中时,我有以下内容:

{'FailedRows': <PCollection[WriteToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).FailedRows] at 0x7f0e0cdcfed0>}
Run Code Online (Sandbox Code Playgroud)

但我不知道如何在管道中使用这个结果,如下所示:

| beam.io.WriteToBigQuery(
     'thijs:thijsset.thijstable',
      schema=table_schema,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
| ['FailedRows'] from previous step
| "print" >> beam.Map(print)
Run Code Online (Sandbox Code Playgroud)

Gui*_*ins 11

处理无效输入的死信是一种常见的 Beam/Dataflow 用法,适用于 Java 和 Python SDK,但后者的示例并不多。

想象一下,我们有一些虚拟输入数据,其中包含 10 行好行和不符合表模式的坏行:

schema = "index:INTEGER,event:STRING"

data = ['{0},good_line_{1}'.format(i + 1, i + 1) for i in range(10)]
data.append('this is a bad row')
Run Code Online (Sandbox Code Playgroud)

然后,我要做的是命名写入结果(events在这种情况下):

events = (p
    | "Create data" >> beam.Create(data)
    | "CSV to dict" >> beam.ParDo(CsvToDictFn())
    | "Write to BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
        "{0}:dataflow_test.good_lines".format(PROJECT),
        schema=schema,
    )
 )
Run Code Online (Sandbox Code Playgroud)

然后访问FAILED_ROWS侧面输出:

(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
    | "Bad lines" >> beam.io.textio.WriteToText("error_log.txt"))
Run Code Online (Sandbox Code Playgroud)

这适用DirectRunner于 BigQuery 并将好的行写入 BigQuery:

在此处输入图片说明

和坏的本地文件:

$ cat error_log.txt-00000-of-00001 
('PROJECT_ID:dataflow_test.good_lines', {'index': 'this is a bad row'})
Run Code Online (Sandbox Code Playgroud)

如果您使用 运行它,DataflowRunner您将需要一些额外的标志。如果您遇到TypeError: 'PDone' object has no attribute '__getitem__'错误,则需要添加--experiments=use_beam_bq_sink以使用新的 BigQuery 接收器。

如果你得到KeyError: 'FailedRows'它,那是因为新接收器将默认为批处理管道加载 BigQuery 作业:

STREAMING_INSERTS、FILE_LOADS 或 DEFAULT。关于将数据加载到 BigQuery 的介绍:https : //cloud.google.com/bigquery/docs/loading-data。DEFAULT 将在流管道上使用 STREAMING_INSERTS,在批处理管道上使用 FILE_LOADS。

您可以通过指定method='STREAMING_INSERTS'in来覆盖行为WriteToBigQuery

在此处输入图片说明

DirectRunnerDataflowRunner 这里的完整代码。