这部分是对Apache Kafka Streams中特定分区上的聚合的后续操作
假设我有一个名为“事件”的主题,它带有3个分区,在该分区上发送字符串->整数数据,如下所示:
(Bob,3)在分区1上
(Sally,4)在分区2上
(Bob,2)在分区3上
...
我想在所有分区上聚合值(在此示例中,只是一个简单的总和),最终得到一个KTable类似于以下内容的:
(莎莉,4)
(鲍勃,5岁)
正如我在上面链接的问题的答案中所提到的,不可能直接进行这种跨分区聚合。但是,答复者提到,如果消息具有相同的密钥,则是可能的(在这种情况下,这是正确的)。如何做到这一点?
我还希望能够从在Kafka Streams应用程序的每个实例之间复制的“全局”状态存储中查询这些聚合值。
我的第一个想法是使用GlobalKTable(根据此页面,我相信这应该是我所需要的)。但是,此状态存储的changelog主题具有与原始“事件”主题相同的分区数量,并且仅基于每个分区而不是跨所有分区进行聚合。
这是我的应用程序的精简版-不确定从何处去:
final Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "metrics-aggregator");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CustomDoubleSerde.class.getName());
streamsConfig.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), 0);
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Double> eventStream = builder.stream(INCOMING_EVENTS_TOPIC);
KTable<String, Double> aggregatedMetrics = eventStream
.groupByKey()
.aggregate(() -> 0d, (key, value, aggregate) -> value + aggregate);
aggregatedMetrics.toStream().print(Printed.<String, Double>toSysOut());
aggregatedMetrics.toStream().to(METRIC_CHANGES_TOPIC);
final KafkaStreams streams = …Run Code Online (Sandbox Code Playgroud) 所以我试图在我的Aurelia应用程序中使用Select2.我安装了Select2 jspm install select2,在我的app.html文件中我需要使用Select2 <require from="select2/js/select2.min.js"></require>.浏览器加载缩小的JS文件,但由于某种原因它也尝试加载
http:// localhost:3003/jspm_packages/github/select2/select2@4.0.0/js/select2.min .html.
为什么Aurelia试图加载我在<require>元素中指定的同一JS文件的HTML副本?我怎样才能解决这个问题?
谢谢
我的Kafka Streams应用程序通常需要大约100毫秒的时间,从发送消息到结果是在不同的主题上发送响应消息的时间.我可以使用哪些配置选项或最佳实践来最小化延迟?