应用不适用于使用 Apache Beam 的 ParDo 和 DoFn

Dr *_*use 1 google-cloud-dataflow apache-beam

我正在实施 Pub/Sub 到 BigQuery 管道。它看起来类似于How to create read transform using ParDo and DoFn in Apache Beam,但在这里,我已经创建了一个 PCollection。

我正在遵循Apache Beam 文档中描述的内容来实现 ParDo 操作以使用以下管道准备表行:

static class convertToTableRowFn extends DoFn<PubsubMessage, TableRow> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        PubsubMessage message = c.element();
        // Retrieve data from message
        String rawData = message.getData();
        Instant timestamp = new Instant(new Date());
        // Prepare TableRow
        TableRow row = new TableRow().set("message", rawData).set("ts_reception", timestamp);
        c.output(row);
    }
}

// Read input from Pub/Sub
pipeline.apply("Read from Pub/Sub",PubsubIO.readMessagesWithAttributes().fromTopic(topicPath))
        .apply("Prepare raw data for insertion", ParDo.of(new convertToTableRowFn()))
        .apply("Insert in Big Query", BigQueryIO.writeTableRows().to(BQTable));
Run Code Online (Sandbox Code Playgroud)

我发现在该DOFN功能要点

我不断收到以下错误:

The method apply(String, PTransform<? super PCollection<PubsubMessage>,OutputT>) in the type PCollection<PubsubMessage> is not applicable for the arguments (String, ParDo.SingleOutput<PubsubMessage,TableRow>)
Run Code Online (Sandbox Code Playgroud)

我一直都明白 ParDo/DoFn 操作是按元素进行的 PTransform 操作,我错了吗?我从来没有在 Python 中遇到过这种类型的错误,所以我对为什么会发生这种情况感到有些困惑。

Ant*_*ton 5

你是对的,ParDos是元素明智的转换,你的方法看起来是正确的。

你看到的是编译错误。当apply()java 编译器推断的方法的参数类型与实际输入的类型不匹配时,就会发生这样的事情,例如convertToTableRowFn.

从您看到的错误来看,java 推断出第二个参数apply()的类型为PTransform<? super PCollection<PubsubMessage>,OutputT>,而您正在传递的子类是ParDo.SingleOutput<PubsubMessage,TableRow>(您的convertToTableRowFn)。看看SingleOutput你的定义convertToTableRowFn基本上是一个PTransform<PCollection<? extends PubsubMessage>, PCollection<TableRow>>. 并且 java 无法apply在它期望的地方使用它PTransform<? super PCollection<PubsubMessage>,OutputT>

看起来可疑的是 java 没有推断出OutputTto PCollection<TableRow>。如果您有其他错误,则无法这样做的原因之一。你确定你没有其他错误吗?

例如,当我尝试执行此操作时,看着convertToTableRowFn您正在调用的调用message.getData()不存在,并且在那里编译失败。在我来说,我需要做这样的事情,而不是:rawData = new String(message.getPayload(), Charset.defaultCharset())。还.to(BQTable))需要一个字符串(例如表示 BQ 表名称的字符串)作为参数,并且您正在传递一些未知符号BQTable(尽管它可能存在于您的程序中的某个地方,但在您的情况下这不是问题)。

在我修复这两个错误后,您的代码为我编译,apply()完全推断并且类型兼容。