在带有窗口的GroupByKey之后,光束管道不会产生任何输出

tal*_*onx 5 google-cloud-dataflow apache-flink apache-beam

我正在使用FlinkRunner在本地运行以下Beam管道代码。PubsubIO用于读取主题中的消息。

我有一个单独的线程,该线程以固定的时间间隔(每30秒)将消息发布到主题,并且还设置了“ ts”属性,该属性稍后将用于导出事件时间。

自定义转换以转换为KV对-

private static class PubSubMessageGrouper extends DoFn<PubsubMessage, KV<String, PubsubMessage>> {

    @ProcessElement
    public void processElement(ProcessContext c) {
        PubsubMessage element = c.element();
        KV<String, PubsubMessage> kv = KV.of(element.getAttribute("key"), element);
        c.output(kv);
    }
}
Run Code Online (Sandbox Code Playgroud)

请注意,“密钥”是发布者线程中较早的消息属性中设置的密钥。目的是通过此键将消息下游分组。

管道代码-

PCollection<PubsubMessage> pubsubColl = p
        .apply(PubsubIO.readMessagesWithAttributes()
            .withTimestampAttribute("ts")
            .fromTopic("projects/" + projectName + "/topics/beamtest")
        );


PCollection<KV<String, PubsubMessage>> idfied =
        pubsubColl.apply(ParDo.of(new PubSubMessageGrouper()));

PCollection<KV<String, PubsubMessage>> windowed = idfied
        .apply(Window.<KV<String, PubsubMessage>>into(FixedWindows.of(Duration.standardSeconds(15)))
            .triggering(
                Repeatedly.forever(
                    AfterWatermark.pastEndOfWindow()
                )
            )
            .withAllowedLateness(Duration.standardSeconds(15))
            .discardingFiredPanes());

PCollection<KV<String, Iterable<PubsubMessage>>> grouped = windowed.apply(GroupByKey.create());

grouped.apply(ParDo.of(new KVPrinter()));
Run Code Online (Sandbox Code Playgroud)

转换没有链接以便于阅读。最后,KVPrinter转换只是打印出从group by接收到的消息,一旦我运行此消息,它将随后由实际代码替换。

当我运行此命令时,我没有在相当长的时间内(几分钟或更长时间)执行触发器。当它最终触发时,我看到某些消息没有收到(在最后一步中)。这是由于PubsubIO使用的内部水印吗?我的目的是确保所有消息都在groupby中进行处理,包括允许的延迟窗口中的延迟消息。

我看了相关的问题,但没有得到很多帮助。无论我等待多长时间,使用DirectRunner运行相同的东西都不会产生任何输出。还要注意的另一点是,如果删除管道的GroupBy部分,则会触发触发器。

在GCD上运行的PubsubIO的水印启发式是什么?

带有GroupByKey的Apache Beam PubSubIO

使用默认触发器在Windows中使用无边界数据

注意:即使没有在Dataflow上测试过,我也用Dataflow进行了标记。原因是我打算将其部署在Dataflow上,也希望从正在关注该标签的Dataflow团队寻求帮助。