小编Rob*_*old的帖子

聚集在Kafka Streams中的多个分区上

这部分是对Apache Kafka Streams中特定分区上的聚合的后续操作

假设我有一个名为“事件”的主题,它带有3个分区,在该分区上发送字符串->整数数据,如下所示:

(Bob,3)在分区1上

(Sally,4)在分区2上

(Bob,2)在分区3上

...

我想在所有分区上聚合值(在此示例中,只是一个简单的总和),最终得到一个KTable类似于以下内容的:

(莎莉,4)

(鲍勃,5岁)

正如我在上面链接的问题的答案中所提到的,不可能直接进行这种跨分区聚合。但是,答复者提到,如果消息具有相同的密钥,则是可能的(在这种情况下,这是正确的)。如何做到这一点?

我还希望能够从在Kafka Streams应用程序的每个实例之间复制的“全局”状态存储中查询这些聚合值。

我的第一个想法是使用GlobalKTable(根据此页面,我相信应该是我所需要的)。但是,此状态存储的changelog主题具有与原始“事件”主题相同的分区数量,并且仅基于每个分区而不是跨所有分区进行聚合。

这是我的应用程序的精简版-不确定从何处去:

final Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "metrics-aggregator");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CustomDoubleSerde.class.getName());
streamsConfig.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), 0);
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);

final StreamsBuilder builder = new StreamsBuilder();

KStream<String, Double> eventStream = builder.stream(INCOMING_EVENTS_TOPIC);
KTable<String, Double> aggregatedMetrics = eventStream
        .groupByKey()
        .aggregate(() -> 0d, (key, value, aggregate) -> value + aggregate);

aggregatedMetrics.toStream().print(Printed.<String, Double>toSysOut());
aggregatedMetrics.toStream().to(METRIC_CHANGES_TOPIC);

final KafkaStreams streams = …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

6
推荐指数
1
解决办法
1613
查看次数

Aurelia试图从Select2加载HTML?

所以我试图在我的Aurelia应用程序中使用Select2.我安装了Select2 jspm install select2,在我的app.html文件中我需要使用Select2 <require from="select2/js/select2.min.js"></require>.浏览器加载缩小的JS文件,但由于某种原因它也尝试加载

http:// localhost:3003/jspm_packages/github/select2/select2@4.0.0/js/select2.min .html.

为什么Aurelia试图加载我在<require>元素中指定的同一JS文件的HTML副本?我怎样才能解决这个问题?

谢谢

javascript jquery-select2 aurelia jquery-select2-4

4
推荐指数
1
解决办法
2731
查看次数

如何最大限度地减少Kafka Streams应用程序的延迟?

我的Kafka Streams应用程序通常需要大约100毫秒的时间,从发送消息到结果是在不同的主题上发送响应消息的时间.我可以使用哪些配置选项或最佳实践来最小化延迟?

apache-kafka apache-kafka-streams

2
推荐指数
1
解决办法
1940
查看次数