滑动窗口中的Kafka KStream相关消息事件

mik*_*ike 6 apache-kafka rocksdb apache-kafka-streams

我认为Kafka Streams可以提供帮助,但是我找不到有帮助的文档/示例.

我找到了一个类似的问题,但它没有任何实现建议(我目前丢失的地方):Kafka Streams等待依赖对象的功能

我想做的事:

我想将Kafka主题中的相关记录关联到单个对象中并生成该新对象.例如,可能有5个消息记录通过唯一键相互关联 - 我想从这些相关对象构建一个新对象,并将其生成到新队列.

我希望所有相关事件都能在一小时内消耗掉.卡夫卡将其描述为滑动窗口.一旦ID为"123"的消息记录A到达消费者,应用程序必须至少等待一小时才能到达ID为"123"的剩余记录.在所有记录到达或一小时后,它们都是过期记录.

最后,一小时内收集的所有相关消息都用于创建新的Object,然后发送到另一个Kafka队列.

我遇到的问题.

Kafka中的滑动窗口似乎只有在将两个流连接在一起时才能工作.我们只有一个流连接到主题 - 我不知道为什么需要两个流或我们将如何实现这一点.我在网上找不到这个例子.我在Kafka中看到的所有流函数在收集相同密钥的事件时简单地聚合/缩减为简单值.例如,键出现的次数或累加某些值

这里有一些伪代码来描述我在说什么.如果功能存在,函数名称/语义将会不同.

    KStream<Key, Object> kstream = kStreamBuilder.stream(TOPIC);
    kstream.windowedBy(
    // One hour sliding Window
    )
    .collectAllRelatedKeys(
    // Collect all Records related to each key
    // map == HashMap<Key, ArrayList<Value>>
       map.get(key).add(value);
    )
    .transformAndProcess(
        if(ALL_EVENTS_COLLECTED) {
        // Create new Object from all related records
            newObject = 
            createNewObjectFromRelatedRecordsFunction(map.get(key));
            producer.send(newObject);   
        }
    )
Run Code Online (Sandbox Code Playgroud)

问题(谢谢你的帮助):

  1. 我怎么能用一个流的滑动窗口?
  2. 如何自定义KStream/KTable函数以收集Window中的所有相关事件并将新对象生成到另一个队列?
  3. acking/offset管理如何与Sliding Window流一起使用?
  4. 这可以保证完全一次交货吗?供参考:https://www.confluent.io/blog/enabling-exactly-kafka-streams/

Mat*_*Sax 0

Apache Kafka 2.7 中添加了对聚合的滑动窗口支持。

参见https://issues.apache.org/jira/browse/KAFKA-5636