我正在使用TextIO从Cloud Storage中读取内容。因为我想让作业连续运行,所以我使用了watchForNewFiles。
为了完整起见,如果我使用有界的PCollections(批处理模式下没有watchForNewFiles和BigQueryIO),则读取的数据可以正常工作,因此没有数据问题。
我有p.run()。waitUntilFinish(); 在我的代码中,因此管道在运行。而且它不会给出任何错误。
Apache Beam版本为2.8.0
PCollection<String> stream =
p.apply("Read File", TextIO
.read()
.from(options.getInput())
.watchForNewFiles(
Duration.standardMinutes(1),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))
)
.withCompression(Compression.AUTO));
Run Code Online (Sandbox Code Playgroud)
这样可以很好地工作,并在文件可用时立即读取文件。PCollection是无界的,并且包含这些文件中的文本行。
经过一些转换
PCollection<List<String>> lines = stream.apply("Parse CSV",
ParDo.of(new ParseCSV())
);
PCollection<TableRow> rows = lines.apply("Convert to BQ",
ParDo.of(new BigQueryConverter(schema))
);
Run Code Online (Sandbox Code Playgroud)
ParseCSV步骤通过outputWithTimestamp将时间戳添加到其接收者。
我最终得到了准备好流向BigQuery的TableRows的PCollection。为此,我用
WriteResult result = rows.apply("WriteToBigQuery",
BigQueryIO.
<TableRow>write()
.withFormatFunction(input -> input)
.withSchema(bqSchema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withExtendedErrorInfo()
.to(options.getOutput())
);
Run Code Online (Sandbox Code Playgroud)
这永远不会将数据写入BigQuery。如果看一下UI,我会看到BigQueryIO
数据进入和离开前两个步骤。但是绝对不要改组。这只会读取数据,而不会继续传递数据。Reshuffle内部的导致GroupByKey的步骤。
由于集合是无界的,因此我尝试使用
lines = lines.apply(Window.configure()
.<List<String>>into(FixedWindows
.of(Duration.standardSeconds(10))
)
);
Run Code Online (Sandbox Code Playgroud)
这将强制执行GroupByKey的所有操作在10秒后释放窗口。但事实并非如此。
lines = lines.apply(Window.configure()
.<List<String>>into(FixedWindows
.of(Duration.standardSeconds(10))
)
.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10)))
.withAllowedLateness(Duration.standardSeconds(0)) …Run Code Online (Sandbox Code Playgroud)