KStream批处理窗口

sam*_*mst 8 apache-kafka-streams

我想用KStream接口批量消息.

我有一个带有键/值的Stream我试图在翻滚窗口中收集它们,然后我想立即处理完整的窗口.

builder.stream(longSerde, updateEventSerde, CONSUME_TOPIC)
                .aggregateByKey(
                        HashMap::new,
                        (aggKey, value, aggregate) -> {
                            aggregate.put(value.getUuid, value);
                            return aggregate;
                        },
                        TimeWindows.of("intentWindow", 100),
                        longSerde, mapSerde)
                .foreach((wk, values) -> {
Run Code Online (Sandbox Code Playgroud)

事情是每次更新到KTable时调用foreach.一旦完成,我想处理整个窗口.如从100毫秒收集数据,然后立即处理.在每个.

16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 294
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 295
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 296
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 297
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 298
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 299
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 1
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 2
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 3
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 4
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 5
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 6
Run Code Online (Sandbox Code Playgroud)

在某些时候,新窗口从地图中的1个条目开始.所以我甚至不知道窗户什么时候满了.

任何在kafka流中批处理的提示

sam*_*mst 5

我的实际任务是将更新从流推送到redis,但即使redis速度很快,我也不想单独读取/更新/写入.我现在的解决方案是使用KStream.process()提供一个处理器,该处理器在进程中添加队列并实际处理队列中的队列.

public class BatchedProcessor extends AbstractProcessor{

...
BatchedProcessor(Writer writer, long schedulePeriodic)

@Override
public void init(ProcessorContext context) {
    super.init(context);
    context.schedule(schedulePeriodic);
}

@Override
public void punctuate(long timestamp) {
    super.punctuate(timestamp);
    writer.processQueue();
    context().commit();
}

@Override
public void process(Long aLong, IntentUpdateEvent intentUpdateEvent) {
    writer.addToQueue(intentUpdateEvent);
}
Run Code Online (Sandbox Code Playgroud)

我仍然需要测试,但它解决了我遇到的问题.人们可以很容易地以非常通用的方式编写这样的处理器.API非常整洁干净但是processBatched((List batchedMessaages) - > ...,timeInterval或countInterval)只是使用punctuate来处理批处理并在那时提交并在Store中收集KeyValues可能是一个有用的补充.

但也许它的目的是通过一个处理器解决这个问题,并将API纯粹保留在一个消息中,同时保持低延迟焦点.


Mic*_*oll 4

现在(从 Kafka 0.10.0.0 / 0.10.0.1 开始):您描述的窗口行为“按预期工作”。也就是说,如果您收到 1,000 条传入消息,您(目前)将始终看到 1,000 条更新通过最新版本的 Kafka / Kafka Streams 发送到下游。

展望未来:Kafka 社区正在开发新功能,以使这种更新率行为更加灵活(例如,允许您上面描述的所需行为)。有关更多详细信息,请参阅KIP-63:统一流中的存储和下游缓存


归档时间:

查看次数:

2759 次

最近记录:

8 年,9 月 前