小编Tim*_*.G.的帖子

使用KSQL计算所有条目

是否可以使用KSQL来计算特定列的条目,GROUP BY而不是通过应用程序流的所有条目聚合?

我正在寻找这样的东西:

| Count all | Count id1 | count id2 |
| ---245----|----150----|----95-----|
Run Code Online (Sandbox Code Playgroud)

或者在KSQL中更像这样:

[some timestamp] | Count all | 245   
[some timestamp] | Count id1 | 150   
[some timestamp] | Count id2 | 95   
.   
.   
.   
Run Code Online (Sandbox Code Playgroud)

谢谢
- 蒂姆

apache-kafka apache-kafka-streams ksql

4
推荐指数
1
解决办法
2103
查看次数

将消息从一个Kafka群集流式传输到另一个群集

我目前正在尝试轻松地将消息从一个Kafka集群上的主题流式传输到另一个Kafka集群(远程 - >本地集群).
我们的想法是立即使用Kafka-Streams,这样我们就不需要复制本地集群上的实际消息,而只需将Kafka-Streams处理的"结果"处理到我们的Kafka-Topics中.

所以,让我们说WordCount演示在另一台PC上的一个Kafka-Instance上,而不是我自己的.我还在我的本地计算机上运行了Kafka-Instance.
现在我想让WordCount演示在主题("远程")上运行,其中包含应该计算单词的句子.
然而,计数应写入我本地系统的主题,而不是"远程"主题.

使用Kafka-Streams API是否可以这样做?
例如

val builder: KStreamBuilder = new KStreamBuilder(remote-streamConfig, local-streamconfig)
val textLines: KStream[String, String] = builder.stream("remote-input-topic", 
remote-streamConfig)
val wordCounts: KTable[String, Long] = textLines
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
    .groupBy((_, word) => word)
    .count("word-counts")

wordCounts.to(stringSerde, longSerde, "local-output-topic", local-streamconfig)

val streams: KafkaStreams = new KafkaStreams(builder)
streams.start()
Run Code Online (Sandbox Code Playgroud)

非常感谢
- 蒂姆

apache-kafka apache-kafka-streams

4
推荐指数
1
解决办法
1220
查看次数

标签 统计

apache-kafka ×2

apache-kafka-streams ×2

ksql ×1