小编Hem*_*gle的帖子

Kafka Stream Scala API性能下降

我正在探索Kafka Streams API并在Scala和Java中实现示例WordCountExample,但是当我运行Scala代码时,与Java示例相比,它提供的输出速度非常慢.

另外我观察到当我在wordCountInput主题上创建记录时,它立即出现在sinkWordCount主题中,但是如果我们运行Scala代码但是如果我们运行Java代码,那么wordCount的结果不会立即显示在wordCountOutput主题中,但是如果我们运行Java代码那么一切正常,我可以看到wordCountOutput主题中的输出立即生效.

Java代码:

public class WordCountExample {

        public static void main(String[] args) {
            Properties p = new Properties();
            p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
            p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.93:9092");
            p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

            StreamsBuilder builder = new StreamsBuilder();
            KStream<String, String> textLines = builder.stream("wordCountInput");
            KTable<String, String> wordCounts = textLines
                    .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\+W")))
                    .groupBy((key, word) -> word)
                    .count()
                    .mapValues((key, value) -> value.toString());

            textLines.to("sinkWordCount");

            wordCounts.toStream().to("wordCountOutput");

            KafkaStreams …
Run Code Online (Sandbox Code Playgroud)

java scala apache-kafka apache-kafka-streams

6
推荐指数
0
解决办法
495
查看次数

标签 统计

apache-kafka ×1

apache-kafka-streams ×1

java ×1

scala ×1