是否可以使用KSQL来计算特定列的条目,GROUP BY而不是通过应用程序流的所有条目聚合?
我正在寻找这样的东西:
| Count all | Count id1 | count id2 |
| ---245----|----150----|----95-----|
Run Code Online (Sandbox Code Playgroud)
或者在KSQL中更像这样:
[some timestamp] | Count all | 245
[some timestamp] | Count id1 | 150
[some timestamp] | Count id2 | 95
.
.
.
Run Code Online (Sandbox Code Playgroud)
谢谢
- 蒂姆
我目前正在尝试轻松地将消息从一个Kafka集群上的主题流式传输到另一个Kafka集群(远程 - >本地集群).
我们的想法是立即使用Kafka-Streams,这样我们就不需要复制本地集群上的实际消息,而只需将Kafka-Streams处理的"结果"处理到我们的Kafka-Topics中.
所以,让我们说WordCount演示在另一台PC上的一个Kafka-Instance上,而不是我自己的.我还在我的本地计算机上运行了Kafka-Instance.
现在我想让WordCount演示在主题("远程")上运行,其中包含应该计算单词的句子.
然而,计数应写入我本地系统的主题,而不是"远程"主题.
使用Kafka-Streams API是否可以这样做?
例如
val builder: KStreamBuilder = new KStreamBuilder(remote-streamConfig, local-streamconfig)
val textLines: KStream[String, String] = builder.stream("remote-input-topic",
remote-streamConfig)
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
.groupBy((_, word) => word)
.count("word-counts")
wordCounts.to(stringSerde, longSerde, "local-output-topic", local-streamconfig)
val streams: KafkaStreams = new KafkaStreams(builder)
streams.start()
Run Code Online (Sandbox Code Playgroud)
非常感谢
- 蒂姆