mik*_*ike 6 apache-kafka rocksdb apache-kafka-streams
我认为Kafka Streams可以提供帮助,但是我找不到有帮助的文档/示例.
我找到了一个类似的问题,但它没有任何实现建议(我目前丢失的地方):Kafka Streams等待依赖对象的功能
我想做的事:
我想将Kafka主题中的相关记录关联到单个对象中并生成该新对象.例如,可能有5个消息记录通过唯一键相互关联 - 我想从这些相关对象构建一个新对象,并将其生成到新队列.
我希望所有相关事件都能在一小时内消耗掉.卡夫卡将其描述为滑动窗口.一旦ID为"123"的消息记录A到达消费者,应用程序必须至少等待一小时才能到达ID为"123"的剩余记录.在所有记录到达或一小时后,它们都是过期记录.
最后,一小时内收集的所有相关消息都用于创建新的Object,然后发送到另一个Kafka队列.
我遇到的问题.
Kafka中的滑动窗口似乎只有在将两个流连接在一起时才能工作.我们只有一个流连接到主题 - 我不知道为什么需要两个流或我们将如何实现这一点.我在网上找不到这个例子.我在Kafka中看到的所有流函数在收集相同密钥的事件时简单地聚合/缩减为简单值.例如,键出现的次数或累加某些值
这里有一些伪代码来描述我在说什么.如果功能存在,函数名称/语义将会不同.
KStream<Key, Object> kstream = kStreamBuilder.stream(TOPIC);
kstream.windowedBy(
// One hour sliding Window
)
.collectAllRelatedKeys(
// Collect all Records related to each key
// map == HashMap<Key, ArrayList<Value>>
map.get(key).add(value);
)
.transformAndProcess(
if(ALL_EVENTS_COLLECTED) {
// Create new Object from all related records
newObject =
createNewObjectFromRelatedRecordsFunction(map.get(key));
producer.send(newObject);
}
)
Run Code Online (Sandbox Code Playgroud)
问题(谢谢你的帮助):
| 归档时间: |
|
| 查看次数: |
1294 次 |
| 最近记录: |