我正在探索Kafka Streams API并在Scala和Java中实现示例WordCountExample,但是当我运行Scala代码时,与Java示例相比,它提供的输出速度非常慢.
另外我观察到当我在wordCountInput主题上创建记录时,它立即出现在sinkWordCount主题中,但是如果我们运行Scala代码但是如果我们运行Java代码,那么wordCount的结果不会立即显示在wordCountOutput主题中,但是如果我们运行Java代码那么一切正常,我可以看到wordCountOutput主题中的输出立即生效.
Java代码:
public class WordCountExample {
public static void main(String[] args) {
Properties p = new Properties();
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.93:9092");
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("wordCountInput");
KTable<String, String> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\+W")))
.groupBy((key, word) -> word)
.count()
.mapValues((key, value) -> value.toString());
textLines.to("sinkWordCount");
wordCounts.toStream().to("wordCountOutput");
KafkaStreams …Run Code Online (Sandbox Code Playgroud)