相关疑难解决方法(0)

如何发送时间窗口KTable的最终kafka-streams聚合结果?

我想做的是:

  1. 从数字主题(Long的)消费记录
  2. 聚合(计数)每个5秒窗口的值
  3. 将FINAL聚合结果发送到另一个主题

我的代码看起来像这样:

KStream<String, Long> longs = builder.stream(
        Serdes.String(), Serdes.Long(), "longs");

// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts = 
        longs.countByKey(TimeWindows.of("longCounts", 5000L));

// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
        .to("long-counts");
Run Code Online (Sandbox Code Playgroud)

看起来一切都按预期工作,但聚合被发送到每个传入记录的目标主题.我的问题是如何只发送每个窗口的最终聚合结果?

apache-kafka apache-kafka-streams

31
推荐指数
2
解决办法
1万
查看次数

Apache Kafka订单根据其值来窗口化消息

我正在尝试找到一种方法来重新排序主题分区中的消息并将有序消息发送到新主题.

我有Kafka发布者发送以下格式的String消息: {system_timestamp}-{event_name}?{parameters}

例如:

1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3
Run Code Online (Sandbox Code Playgroud)

此外,我们为每条消息添加一些消息密钥,以将它们发送到相应的分区.

我想要做的是根据消息的{system-timestamp}部分重新排序事件,并在1分钟的窗口内,因为我们的发布者不保证将根据{system-timestamp}值发送消息.

例如,我们可以向主题提供首先具有更大{system-timestamp}值的消息.

我已经调查了Kafka Stream API并找到了一些关于消息窗口化和聚合的例子:

Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sorter");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

 KStreamBuilder builder = new KStreamBuilder();
 KStream<String, String> stream = builder.stream("events");
 KGroupedStream<String>, String> groupedStream = stream.groupByKey();//grouped events within partion.

    /* commented since I think that I don't need any aggregation, but I guess without aggregation I can't use time windowing.
KTable<Windowed<String>, String> windowedEvents = stream.groupByKey().aggregate(
                () -> "", …
Run Code Online (Sandbox Code Playgroud)

java messaging stream apache-kafka

6
推荐指数
1
解决办法
2206
查看次数