IoT*_*ser 2 java google-cloud-dataflow apache-beam
我正在尝试在 Dataflow 中进行单元测试。
对于那个测试,在乞求时,我将从一个简单的硬编码字符串开始。
问题是我需要将该字符串转换为 pubsub 消息。我得到了以下代码来做到这一点:
// Create a PCollection from string a transform to pubsub message format
PCollection<PubsubMessage> input = p.apply("input string", Create.of("test" +
""))
.apply("convert to Pub/Sub message", ParDo.of(new DoFn<String, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new PubsubMessage(c.element().getBytes(), null));
}
}));
Run Code Online (Sandbox Code Playgroud)
但我收到以下错误:
java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=com.xxx.pipeline.TesterPipeline$1@7b64240d, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
<...>
Caused by: java.io.NotSerializableException: com.xxx.pipeline.TesterPipeline
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51)
... 50 more
Run Code Online (Sandbox Code Playgroud)
我应该如何从字符串创建 pubsub 消息?
在用户 ParDos 的可序列化要求下的Beam 编程指南中,它提到了这一点:
使用匿名内部类实例内联声明函数对象时要小心。在非静态上下文中,您的内部类实例将隐式包含指向封闭类和该类状态的指针。该封闭类也将被序列化,因此适用于函数对象本身的相同考虑也适用于该外部类。
发生的事情是您的匿名 DoFn 隐式包含一个指向您正在构建管道的类的指针,这导致了此序列化失败。您可以通过将 DoFn 设为命名子类而不是匿名来避免这种情况:
public class MyDoFn extends DoFn<String, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new PubsubMessage(c.element().getBytes(), null));
}
}
Run Code Online (Sandbox Code Playgroud)