将 TupleTag 传递给 DoFn 方法

Mat*_*ias 2 java google-cloud-dataflow apache-beam

我正在尝试从 DoFn 方法获得两个输出,遵循Apache Beam 编程指南的示例

基本上在示例中,您传递一个 TupleTag,然后指定在哪里进行输出,这对我有用,问题是我在 ParDo 中调用了一个外部方法,但不知道如何传递这个 TupleTag,这是我的代码:

PCollectionTuple processedData = pubEv
  .apply("Processing", ParDo.of(new HandleEv())
      .withOutputTags(mainData, TupleTagList.of(failedData)));
Run Code Online (Sandbox Code Playgroud)

HandleEv 方法:

static class HandleEv extends DoFn<String, String> {
    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {
      c.output("test")
      c.output(failedData,"failed")
    }
}
Run Code Online (Sandbox Code Playgroud)

我得到的错误是cannot find symbol由于 failedData 不能从 HandleEv 访问,我试图在课程开始时声明 failedData 但也不起作用。

非常感谢

Ben*_*ers 6

您可以像将值传递给任何其他对象一样执行此操作——将其作为参数传递给 的构造函数HandleEv并将其存储在字段中:

static class HandleEv extends DoFn<String, String> {
  private final TupleTag<String> failedData;
  public HandleEv(TupleTag<String> failedData) {
    this.failedData = failedData;
  }

  @ProcessElement
  public void processElement(ProcessContext c) throws Exception {
    c.output("test")
    c.output(failedData,"failed")
  }
}
Run Code Online (Sandbox Code Playgroud)

然后像这样使用它:

PCollectionTuple processedData = pubEv
  .apply("Processing", ParDo.of(new HandleEv(failedData))
      .withOutputTags(mainData, TupleTagList.of(failedData)));
Run Code Online (Sandbox Code Playgroud)