在从Dataflow插入BigQuery之前验证行

The*_*heo 8 google-bigquery google-cloud-dataflow

根据我们 如何从数据流加载Bigquery表时设置maximum_bad_records?maxBadRecords从Dataflow将数据加载到BigQuery时,目前无法设置配置.建议在将数据插入BigQuery之前验证Dataflow作业中的行.

如果我有TableSchema和a TableRow,我该如何确保可以安全地将行插入表中?

必须有一种更简单的方法来做到这一点,而不是迭代模式中的字段,查看它们的类型并查看行中值的类,对吧?这似乎容易出错,并且该方法必须是万无一失的,因为如果无法加载单行,整个管道就会失败.

更新:

我的用例是一个ETL作业,最初将在JSON上运行(每行一个对象)登录云存储并批量写入BigQuery,但稍后将从PubSub读取对象并连续写入BigQuery.这些对象包含很多BigQuery中不需要的信息,还包含甚至无法在模式中描述的部分(基本上是自由形式的JSON有效负载).像时间戳这样的东西也需要格式化以与BigQuery一起使用.这个作业的一些变体会在不同的输入上运行并写入不同的表.

从理论上讲,这不是一个非常困难的过程,它需要一个对象,提取一些属性(50-100),格式化其中一些并将对象输出到BigQuery.我或多或少只是循环遍历属性名称列表,从源对象中提取值,查看配置以查看属性是否应该以某种方式格式化,如果需要应用格式(这可能是下行,划分毫秒时间戳) 1000,从URL中提取主机名等),并将值写入TableRow对象.

我的问题是数据混乱.有几亿个物体有一些看起来并不像预期的那样,这种情况很少见,但是这些物品仍然很少见.有时,应包含字符串的属性包含整数,反之亦然.有时会有一个数组或一个应该有字符串的对象.

理想情况下,我想接受TableRow并通过TableSchema并询问"这有效吗?".

因为这是不可能的,所以我做的是查看TableSchema对象并尝试自己验证/转换值.如果TableSchema说属性是STRINGI 类型,则value.toString()在将其添加到之前TableRow.如果是,INTEGER我检查它是a Integer,Long或者BigInteger等等.这种方法的问题在于我只是猜测BigQuery会起什么作用.它接受哪些Java数据类型FLOAT?为了TIMESTAMP?我认为我的验证/演员表可以解决大多数问题,但总有例外和边缘情况.

根据我的经验,这是非常有限的,整个工作流程(工作?工作流程?不确定正确的术语)如果单行失败BigQuery的验证失败(就像常规加载一样,除非maxBadRecords设置为足够大的数字).它也失败了表面有用的消息,如'BigQuery导入作业"dataflow_job_xxx"失败.原因:(5db0b2cdab1557e0):项目"xxx"中的BigQuery作业"dataflow_job_xxx"已完成错误:errorResult:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射,错误:指定了JSON映射对于非记录字段,错误:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射'.也许在哪里可以看到更详细的错误消息,可以告诉我它是哪个属性,价值是什么?没有这些信息,它也可以说"坏数据".

据我所知,至少在批处理模式下运行时,Dataflow会将TableRow对象写入云存储中的临时区域,然后在所有内容完成后启动加载.这意味着我无处可捕获任何错误,我的代码在加载BigQuery时不再运行.我还没有在流模式下运行任何工作,但是我不确定它会有什么不同,从我的(公认有限的)理解基本原理是相同的,它只是批量大小更小.

人们使用Dataflow和BigQuery,因此,如果不必担心由于单个错误输入而导致整个管道停止,就不可能完成这项工作.人们如何做到这一点?

Dan*_*rin 8

我假设您将文件中的JSON反序列化为Map<String, Object>.然后你应该能够递归地使用a进行类型检查TableSchema.

我建议使用迭代方法开发模式验证,并执行以下两个步骤.

  1. 编写一个PTransform<Map<String, Object>, TableRow>将JSON行转换为TableRow对象的文件.它TableSchema也应该是函数的构造函数参数.你可以开始使这个函数非常严格 - 要求JSON直接将输入解析为Integer,例如,当找到BigQuery INTEGER模式时 - 并积极地声明错误记录.基本上,确保在处理时超级严格不会输出无效记录.

    我们的代码在这里做了一些类似的事情 - 给定一个由BigQuery生成并作为JSON写入GCS的文件,我们递归地遍历模式并进行一些类型转换.但是,我们不需要验证,因为BigQuery本身编写了数据.

    请注意,TableSchema对象不是Serializable.我们已经通过将TableSchemain DoFnPTransform构造函数转换为JSON String并返回来解决了这个问题.请参阅使用该变量的代码BigQueryIO.javajsonTableSchema.

  2. 使用本博文中描述的"死信"策略来处理错误记录 - Map<String, Object>从PTransform 输出有问题的行并将其写入文件.这样,您可以检查稍后验证失败的行.

您可以从一些小文件开始并使用DirectPipelineRunner而不是DataflowPipelineRunner.直接运行程序在您的计算机上运行管道,而不是在Google Cloud Dataflow服务上运行,它使用BigQuery流式写入.我相信当这些写入失败时,您将获得更好的错误消息.

(我们对批处理作业使用GCS-> BigQuery负载作业模式,因为它更有效,更具成本效益,但BigQuery流在Streaming作业中写入,因为它们是低延迟的.)

最后,在记录信息方面:

  • 绝对检查Cloud Logging(按照Worker Logs日志面板上的链接).
  • 如果运行bq命令行实用程序,您可能会获得有关批处理数据流触发的加载作业失败原因的更好信息:bq show -j PROJECT:dataflow_job_XXXXXXX.

  • 当对象不可序列化时,这很烦人.我过去通过使用BigQuery API库附带的`JsonFactory`方法解决了这个问题:`TableSchema schema = JacksonFactory.getDefaultInstance().fromString(schemaStr,TableSchema.class);`还更新答案. (2认同)
  • 我还可以报告在使用您的一些建议后,我已经使用我的Dataflow作业成功地将2亿行数据加载到BigQuery中. (2认同)