我想用KStream接口批量消息.
我有一个带有键/值的Stream我试图在翻滚窗口中收集它们,然后我想立即处理完整的窗口.
builder.stream(longSerde, updateEventSerde, CONSUME_TOPIC)
.aggregateByKey(
HashMap::new,
(aggKey, value, aggregate) -> {
aggregate.put(value.getUuid, value);
return aggregate;
},
TimeWindows.of("intentWindow", 100),
longSerde, mapSerde)
.foreach((wk, values) -> {
Run Code Online (Sandbox Code Playgroud)
事情是每次更新到KTable时调用foreach.一旦完成,我想处理整个窗口.如从100毫秒收集数据,然后立即处理.在每个.
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 294
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 295
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 296
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 297
16:** - windows from 2016-08-23T10:56:26 to …Run Code Online (Sandbox Code Playgroud) 我有一个KStream我想要计算事件的某些方面.我按如下方式做到:
KTable<Windowed<Long>, Counter> ret = input.groupByKey()
.windowedBy(TimeWindows.of(Duration.of(10, SECONDS)))
.aggregate(Counter::new, (k, v, c) -> new Counter(c.count + v.getDimension()));
Run Code Online (Sandbox Code Playgroud)
我希望有一个新KStream的聚合作为事件.我可以这样轻松地做到:
ret.toStream().to("output");
Run Code Online (Sandbox Code Playgroud)
问题是"输入"主题中的每个事件都会产生一个"输出"主题的事件.我想仅在窗口完成时将事件发布到输出主题.例如,如果窗口是一分钟,则每分钟每个键发送一个事件.
我想我可以这样做:
ret.toStream().foreach((k, v) -> sendToKafkaTopic("output"));
Run Code Online (Sandbox Code Playgroud)
但我想知道是否有更好/更优雅的方式呢?
我已经在Kafka流应用程序中编写了以下代码:
KGroupedStream<String, foo> groupedStream = stream.groupByKey();
groupedStream.windowedBy(
SessionWindows.with(Duration.ofSeconds(3)).grace(Duration.ofSeconds(3)))
.aggregate(() -> {...})
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()...
Run Code Online (Sandbox Code Playgroud)
关闭窗口后,应该(如果我正确理解的话)每个键都发出记录。不知何故,行为如下:
该流不会发出第一条记录,仅使用不同的Key才将其转发到第二条记录之后,然后仅在第三条记录之后发出第二条记录,依此类推。
我已经尝试使用“ exactly_once”使用多个StreamConfigs,并且无论是否具有缓存,这种现象仍然存在。
在此先感谢您的帮助 !
我有一个事件流,我想根据时间窗口聚合这些事件。我的解决方案提供增量聚合,而不是在定时窗口上进行聚合。我已经读过这对于流来说是正常的,因为它会将结果作为更改日志。同样在研究期间,我遇到了 Kafka Streams DSL 的 2 步窗口聚合以及如何发送时间窗口 KTable 的最终 kafka-streams 聚合结果?. 但是第一篇文章中的解决方案有些过时(使用已弃用的 API)。我使用了那些已弃用的 API 中建议的新 API。这是我的解决方案
KStream<String, Event> eventKStream = summarizableData.mapValues(v -> v.getEvent());
KGroupedStream<String, Event> kGroupedStream = eventKStream.groupBy((key, value) -> {
String groupBy = getGroupBy(value, criteria);
return groupBy;
}, Serialized.with(Serdes.String(), eventSerde));
long windowSizeMs = TimeUnit.SECONDS.toMillis(applicationProperties.getWindowSizeInSeconds());
final TimeWindowedKStream<String, Event> groupedByKeyForWindow = kGroupedStream
.windowedBy(TimeWindows.of(windowSizeMs)
.advanceBy(windowSizeMs));
Run Code Online (Sandbox Code Playgroud)
但是,正如我之前所解释的,我的结果不是在特定时间窗口中给出的,而是作为增量聚合给出的。我需要我的数据按照 windowSize 中给出的指定时间输出。我也读到CACHE_MAX_BYTES_BUFFERING_CONFIG可以控制输出,但我需要一些可靠的解决方案适用于每种情况。另请注意,https: //cwiki.apache.org/confluence/display/KAFKA/Windowed+aggregations+over+successively+increasing+timed+windows wiki 中给出的模式现在已经过时,因为它使用旧的 API。(我使用的是 kafka-streams 1.1.0 版本)
我正在使用 Kafka Streams 的 TopologyTestDriver 来测试我们的数据管道。
它对我们所有的简单拓扑(包括使用 Stores 的有状态拓扑)都非常有效。我的问题是当我尝试使用此测试驱动程序来测试使用窗口聚合的拓扑时。
我复制了一个简单的示例,该示例对 10 秒窗口内使用相同密钥接收到的整数求和。
public class TopologyWindowTests {
TopologyTestDriver testDriver;
String INPUT_TOPIC = "INPUT.TOPIC";
String OUTPUT_TOPIC = "OUTPUT.TOPIC";
@Before
public void setup(){
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
// EventProcessor is a <String,String> processor
// so we set those serders
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
testDriver = new TopologyTestDriver(defineTopology(),config,0L);
}
/**
* topology test
*/
@Test
public void testTopologyNoCorrelation() throws IOException {
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), …Run Code Online (Sandbox Code Playgroud) 我有一个KTable,其数据看起来像这样(key => value),其中key是客户ID,值是包含一些客户数据的小JSON对象:
1 => { "name" : "John", "age_group": "25-30"}
2 => { "name" : "Alice", "age_group": "18-24"}
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
Run Code Online (Sandbox Code Playgroud)
我想对这个KTable进行一些聚合,并且基本上保持每个记录的数量age_group.所需的KTable数据如下所示:
"18-24" => 3
"25-30" => 1
Run Code Online (Sandbox Code Playgroud)
让我们说Alice,谁在18-24上面的小组中,有一个生日,让她进入新的年龄组.支持第一个KTable的状态存储现在应该如下所示:
1 => { "name" : "John", "age_group": "25-30"}
2 => { "name" : "Alice", "age_group": "25-30"} # Happy Cake Day
3 …Run Code Online (Sandbox Code Playgroud) 我想知道是否有任何方法可以使用 Kafka Streams DSL 或 Processor API 对窗口内的记录进行排序。
以以下情况为例(任意一种,但与我需要的类似):
有一些事件的 Kafka 主题,比如说用户点击。假设主题有 10 个分区。消息按键进行分区,但每个键都是唯一的,因此它是一种随机分区。每条记录都包含一个用户 ID,稍后会使用该用户 ID 对流进行重新分区。
我们消费流,并将每条消息发布到另一个主题,根据用户 ID 对记录进行分区(按用户 ID 重新分区原始流)。
然后我们消费这个重新分区的流,我们将消费的记录存储在窗口化 10 分钟的本地状态存储中。一个特定用户的所有点击总是在同一个分区,但顺序没有保证,因为原始主题有10个分区。
我了解Kafka Streams的窗口模型,当新记录进来时,时间会提前,但我需要这个窗口使用处理时间,而不是事件时间,然后当窗口过期时,我需要能够对缓冲进行排序事件,并按顺序将它们发送到另一个主题。
注意:
我们需要能够使用处理时间而不是事件时间来刷新/处理窗口内的记录。我们不能等待下一次点击来提前时间,因为它可能永远不会发生。
我们需要从商店中删除所有记录,尽快对窗口进行排序和刷新。
如果应用程序崩溃,我们需要恢复(在应用程序的同一个或另一个实例中)并处理所有未处理的窗口,而不需要等待特定用户的新记录。
我知道 Kafka Streams 1.0.0 允许在处理 API 中使用挂钟时间,但我不确定实现我需要的正确方法是什么(更重要的是考虑到上述恢复过程要求)。
我的Kafka Streams聚合读取了一个紧凑的主题并执行此操作:
(0_10, ..), (0_11, ..) ---> (0, [10]) (0, [10, 11])
我想知道如何控制聚合时间窗口,因此它不会为每个传入的消息发送消息,而是等待并聚合其中的一些消息.Imagine Stream App使用以下消息:
(0_10, ..)(1_11, ..)(0_13, ..)如果以前的3条消息在短时间内到达,我希望看到:
(0,[10])(0, [10, 13])(1, [11])我无法弄清楚,在吐出新值之前,如何告诉我的Kafka Stream应用程序等待更多聚合需要多长时间.
我的代码非常简单
builder
.table(keySerde, valueSerde, sourceTopic)
.groupBy(StreamBuilder::groupByMapper)
.aggregate(
StreamBuilder::aggregateInitializer,
StreamBuilder::aggregateAdder,
StreamBuilder::aggregateSubtractor)
.to(...);
Run Code Online (Sandbox Code Playgroud)
目前,它有时会批量聚合,但不确定如何调整它:
{"Aggregate":[100]}
{"Aggregate":[100,300,301,302]}
{"Aggregate":[100,300,301,302,404]}
Run Code Online (Sandbox Code Playgroud) 所以实际上我已经为此苦苦挣扎了几天。我正在使用 4 个主题的记录。我需要在 TimedWindow 上聚合记录。时间到了,我想向接收器主题发送已批准的消息或未批准的消息。这可能与 kafka 流有关吗?
即使窗口仍然打开,它似乎将每条记录下沉到新主题,这真的不是我想要的。
这是简单的代码:
builder.stream(getTopicList(), Consumed.with(Serdes.ByteArray(),
Serdes.ByteArray()))
.flatMap(new ExceptionSafeKeyValueMapper<String,
FooTriggerMessage>("", Serdes.String(),
fooTriggerSerde))
.filter((key, value) -> value.getTriggerEventId() != null)
.groupBy((key, value) -> value.getTriggerEventId().toString(),
Serialized.with(Serdes.String(), fooTriggerSerde))
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30))
.advanceBy(TimeUnit.SECONDS.toMillis(30)))
.aggregate(() -> new BarApprovalMessage(), /* initializer */
(key, value, aggValue) -> getApproval(key, value, aggValue),/*adder*/
Materialized
.<String, BarApprovalMessage, WindowStore<Bytes, byte[]>>as(
storeName) /* state store name */
.withValueSerde(barApprovalSerde))
.toStream().to(appProperties.getBarApprovalEngineOutgoing(),
Produced.with(windowedSerde, barApprovalSerde));
Run Code Online (Sandbox Code Playgroud)
截至目前,每条记录都被下沉到传出主题,我只希望它在窗口关闭时发送一条消息,可以这么说。
这可能吗?
我开始使用 KStream 来使用来自现有主题的数据。
我只对在 10 秒窗口内获取给定 ID 的最后一个事件感兴趣。我尝试使用以下代码:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, MySale> stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), specificAvroSerde));
stream.selectKey((key, value) -> value.getID())
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.reduce((value1, value2) -> value2)
.toStream()
.peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), specificAvroSerde));
Run Code Online (Sandbox Code Playgroud)
但我最终得到了所有事件,而不仅仅是最后一个。使用 KStream 可以做我想做的事吗?