OCD*_*Dev 4 google-cloud-dataflow apache-beam
我正在 Apache Beam 中编写一个数据管道,它从 Pub/Sub 读取数据,将消息反序列化为 JSONObjects 并将它们传递到其他一些管道阶段。问题是,当我尝试提交代码时,出现以下错误:
执行 Java 类时发生异常。无法返回用于转换为 JSON 并混淆 PII 数据/ParMultiDo(JSONifyAndObfuscate).output [PCollection] 的默认编码器。更正以下根本原因之一: [错误] 未手动指定编码器;您可以使用 .setCoder() 来执行此操作。[错误] 从 CoderRegistry 推断编码器失败:无法为 org.json.JSONObject 提供编码器。[错误] 使用注册的 CoderProvider 构建 Coder 失败。[错误] 请参阅抑制的异常以了解详细的失败信息。[错误] 使用生成 PTransform 的默认输出编码器失败:调用了 PTransform.getOutputCoder。
基本上,错误表明 Beam 无法找到 org.json.JSONObject 对象的编码器。我不知道在哪里可以获得这样的编码器或如何构建一个。有任何想法吗?
谢谢!
了解编码器的最佳起点是《Beam 编程指南:数据编码和类型安全》。简而言之,编码器用于指定如何在 Beam 管道中的某些点(通常在阶段边界)将不同类型的数据编码到字节字符串或从字节字符串编码。不幸的是,默认情况下没有 JSONObjects 的编码器,因此您有两个选择:
避免在 PCollection 中创建 JSONObject。您可以从 JSON 中提取所需的数据并将其作为基本数据类型传递,或者使用自己的类封装所需的数据,而不是在整个管道中传递 JSONObject。Java 的基本数据类型都分配了默认编码器,并且可以轻松地为只是这些类型的结构的类生成编码器。作为一个附带好处,这就是 Beam 管道的构建方式,因此如果您尽可能坚持使用基本数据和知名编码器,它可能会以最佳方式工作。
如果 JSONObjects 是必要的,您将需要为它们创建一个自定义编码器。编程指南包含有关如何将自定义编码器设置为默认编码器的信息。对于实现本身,使用 JSONObject 最简单的方法是使用 JSONObject.toString 将其编码为 JSON 字符串,然后使用 JSONObject 的字符串构造函数将其从字符串中解码。有关如何执行此操作的详细信息,请查看上面的编程指南并查看Coder 文档。
| 归档时间: |
|
| 查看次数: |
2249 次 |
| 最近记录: |