从TextIO到BigQuery使用无限制的PCollection时,数据会滞留在BigQueryIO内部的Reshuffle / GroupByKey中

Flo*_*ert 6 google-bigquery apache-beam

我正在使用TextIO从Cloud Storage中读取内容。因为我想让作业连续运行,所以我使用了watchForNewFiles。

为了完整起见,如果我使用有界的PCollections(批处理模式下没有watchForNewFiles和BigQueryIO),则读取的数据可以正常工作,因此没有数据问题。

我有p.run()。waitUntilFinish(); 在我的代码中,因此管道在运行。而且它不会给出任何错误。

Apache Beam版本为2.8.0

PCollection<String> stream =
        p.apply("Read File", TextIO
                .read()
                .from(options.getInput())
                .watchForNewFiles(
                        Duration.standardMinutes(1),
                        Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))
                )
                .withCompression(Compression.AUTO));
Run Code Online (Sandbox Code Playgroud)

这样可以很好地工作,并在文件可用时立即读取文件。PCollection是无界的,并且包含这些文件中的文本行。

经过一些转换

PCollection<List<String>> lines = stream.apply("Parse CSV",
        ParDo.of(new ParseCSV())
);

PCollection<TableRow> rows = lines.apply("Convert to BQ",
        ParDo.of(new BigQueryConverter(schema))
);
Run Code Online (Sandbox Code Playgroud)

ParseCSV步骤通过outputWithTimestamp将时间戳添加到其接收者。

我最终得到了准备好流向BigQuery的TableRows的PCollection。为此,我用

WriteResult result = rows.apply("WriteToBigQuery",
        BigQueryIO.
                <TableRow>write()
                .withFormatFunction(input -> input)
                .withSchema(bqSchema)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                .withExtendedErrorInfo()
                .to(options.getOutput())

);
Run Code Online (Sandbox Code Playgroud)

这永远不会将数据写入BigQuery。如果看一下UI,我会看到BigQueryIO

  • ShardTableWrites
  • TagWithUniqueId
  • 改组
    • Window.into
    • GroupByKey

数据进入和离开前两个步骤。但是绝对不要改组。这只会读取数据,而不会继续传递数据。Reshuffle内部的导致GroupByKey的步骤。

由于集合是无界的,因此我尝试使用

lines = lines.apply(Window.configure()
        .<List<String>>into(FixedWindows
                .of(Duration.standardSeconds(10))
        )
);
Run Code Online (Sandbox Code Playgroud)

这将强制执行GroupByKey的所有操作在10秒后释放窗口。但事实并非如此。

lines = lines.apply(Window.configure()
        .<List<String>>into(FixedWindows
                .of(Duration.standardSeconds(10))
        )
        .triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10)))
        .withAllowedLateness(Duration.standardSeconds(0))
        .discardingFiredPanes()
);
Run Code Online (Sandbox Code Playgroud)

在处理时间上添加特定触发器也无济于事。有什么线索吗?提前致谢!

tad*_*ade 2

一种解决方法可能是(对我有用)为每个元素分配一个新键,并强制数据流通过 Reshuffle 或 GroupByKey 解耦转换。

streams.apply(WithKeys.of(input -> 1)).setCoder(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
       .apply(Reshuffle.of())
       .apply(MapElements.via(new SimpleFunction<KV<Integer, String>, String>() {
           @Override
           public String apply(KV<Integer, String> input) {
               return input.getValue();
           }
       }))
       .apply("convertToTableRow", ...)
       .apply("WriteToBigQuery", ...)
Run Code Online (Sandbox Code Playgroud)

密钥可以是像示例中那样的常量,也可以是随机的。如果选择随机,则必须将范围设置得足够小以适应 JVM 内存。喜欢ThreadLocalRandom.current().nextInt(0, 5000)