Kafka Stream计算时间窗口未报告零值

Mar*_*lik 3 java apache-kafka apache-kafka-streams

我正在使用Kafka流来计算使用跳跃时间窗口在过去3分钟内发生的事件数:

public class ViewCountAggregator {

    void buildStream(KStreamBuilder builder) {      

        final Serde<String> stringSerde = Serdes.String();
        final Serde<Long> longSerde = Serdes.Long();

        KStream<String, String> views = builder.stream(stringSerde, stringSerde, "streams-view-count-input");
        KStream<String, Long> viewCount = views
            .groupBy((key, value) -> value)
            .count(TimeWindows.of(TimeUnit.MINUTES.toMillis(3)).advanceBy(TimeUnit.MINUTES.toMillis(1)))
            .toStream()
            .map((key, value) -> new KeyValue<>(key.key(), value));

        viewCount.to(stringSerde, longSerde, "streams-view-count-output");        
    }

    public static void main(String[] args) throws Exception {                   
        // some not so important initialization code
        ...  
    }

}
Run Code Online (Sandbox Code Playgroud)

运行使用者并将某些消息推送到输入主题时,随着时间的推移,它会收到以下更新:

single  1
single  1
single  1
five    1
five    4
five    5
five    4
five    1
Run Code Online (Sandbox Code Playgroud)

这几乎是正确的,但它永远不会收到更新:

single  0
five    0
Run Code Online (Sandbox Code Playgroud)

没有它,我的消费者更新计数器将永远不会在没有更长时间的事件时将其设置为零.我期待消费的消息看起来像这样:

single  1
single  1
single  1
single  0
five    1
five    4
five    5
five    4
five    1
five    0
Run Code Online (Sandbox Code Playgroud)

是否有一些我错过的配置选项/参数可以帮助我实现这样的行为?

Mat*_*Sax 6

这几乎是正确的,但它永远不会收到更新:

首先,计算的输出正确的.

第二,为什么它是正确的:

如果应用窗口化聚合,则仅创建具有实际内容的窗口(我熟悉的所有其他系统将生成相同的输出).因此,如果对于某些键,没有比窗口大小更长的时间段的数据,则没有实例化窗口,因此也根本没有计数.

如果没有内容则不实例化窗口的原因很简单:处理器无法知道所有密钥.在您的示例中,您有两个键,但稍后可能会出现第三个键.你期望<thirdKey,0>从一开始就得到吗?此外,由于数据流本质上是无限的,因此密钥可能会消失并且永远不会再出现.如果你还记得所有看过的键,并且<key,0>如果没有关键消失的数据就会发出,你会<key,0>永远发光吗?

我不想说你的预期结果/语义没有意义.它只是您的一个非常具体的用例,并不适用于一般情况.因此,流处理器不实现它.

第三:你能做什么?

有多种选择:

  1. 您的消费者可以跟踪它看到的密钥,并使用嵌入的记录时间戳确定密钥是否"丢失",然后将该密钥设置为零(为此,它也可能有助于删除map步骤和保留Windowed<K>密钥的类型,以便消费者获取记录所属窗口的信息)
  2. #transform()在Stream应用程序中添加有状态步骤,其执行与(1)中所述相同的操作.为此,注册标点回调可能会有所帮助.

方法(2)应该更容易追踪键,你可以附加一个状态存储到您的转换步骤,因此不需要处理状态(和失败/恢复)在下游消费.

然而,这两种方法的棘手部分仍然是决定何时缺少一个键,即你等到你产生多长时间<key,0>.请注意,数据可能迟到(也就是乱序),即使您发出<key,0>迟到的记录,也可能您的代码发出记录生成一条<key,1>消息.但也许这对你的情况来说并不是一个问题,因为它似乎只能使用最新的窗口.<key,0>

最后但并非最不重要的一条评论:似乎您只使用最新的计数,而较新的窗口会覆盖下游消费者中的旧窗口.因此,可能值得探索"交互式查询"以count直接进入运营商的状态而不是消费者主题并更新其他状态.这可能会让您重新设计并简化下游应用程序.查看文档和一篇关于Interactive Queries的非常好的博客文章,了解更多详情.