数据流:字符串到发布订阅消息

IoT*_*ser 2 java google-cloud-dataflow apache-beam

我正在尝试在 Dataflow 中进行单元测试。

对于那个测试,在乞求时,我将从一个简单的硬编码字符串开始。

问题是我需要将该字符串转换为 pubsub 消息。我得到了以下代码来做到这一点:

    // Create a PCollection from string a transform to pubsub message format
    PCollection<PubsubMessage> input = p.apply("input string", Create.of("test" + 
            ""))
            .apply("convert to Pub/Sub message", ParDo.of(new DoFn<String, PubsubMessage>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    c.output(new PubsubMessage(c.element().getBytes(), null));
                }
            }));
Run Code Online (Sandbox Code Playgroud)

但我收到以下错误:

 java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=com.xxx.pipeline.TesterPipeline$1@7b64240d, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
    <...>
Caused by: java.io.NotSerializableException: com.xxx.pipeline.TesterPipeline
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51)
    ... 50 more
Run Code Online (Sandbox Code Playgroud)

我应该如何从字符串创建 pubsub 消息?

Dan*_*ira 7

在用户 ParDos 的可序列化要求下的Beam 编程指南中,它提到了这一点:

使用匿名内部类实例内联声明函数对象时要小心。在非静态上下文中,您的内部类实例将隐式包含指向封闭类和该类状态的指针。该封闭类也将被序列化,因此适用于函数对象本身的相同考虑也适用于该外部类。

发生的事情是您的匿名 DoFn 隐式包含一个指向您正在构建管道的类的指针,这导致了此序列化失败。您可以通过将 DoFn 设为命名子类而不是匿名来避免这种情况:

public class MyDoFn extends DoFn<String, PubsubMessage>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
    c.output(new PubsubMessage(c.element().getBytes(), null));
  }
}
Run Code Online (Sandbox Code Playgroud)