Kafka Stream Scala API性能下降

Hem*_*gle 6 java scala apache-kafka apache-kafka-streams

我正在探索Kafka Streams API并在Scala和Java中实现示例WordCountExample,但是当我运行Scala代码时,与Java示例相比,它提供的输出速度非常慢.

另外我观察到当我在wordCountInput主题上创建记录时,它立即出现在sinkWordCount主题中,但是如果我们运行Scala代码但是如果我们运行Java代码,那么wordCount的结果不会立即显示在wordCountOutput主题中,但是如果我们运行Java代码那么一切正常,我可以看到wordCountOutput主题中的输出立即生效.

Java代码:

public class WordCountExample {

        public static void main(String[] args) {
            Properties p = new Properties();
            p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
            p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.93:9092");
            p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

            StreamsBuilder builder = new StreamsBuilder();
            KStream<String, String> textLines = builder.stream("wordCountInput");
            KTable<String, String> wordCounts = textLines
                    .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\+W")))
                    .groupBy((key, word) -> word)
                    .count()
                    .mapValues((key, value) -> value.toString());

            textLines.to("sinkWordCount");

            wordCounts.toStream().to("wordCountOutput");

            KafkaStreams streams = new KafkaStreams(builder.build(), p);
            streams.cleanUp();
            streams.start();

            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                streams.close();
            }));
        }
}
Run Code Online (Sandbox Code Playgroud)

Java程序的POM文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.mycompany</groupId>
    <artifactId>KafkaStreamExample</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
</project>
Run Code Online (Sandbox Code Playgroud)

Scala代码:

object KafkaStreamExample extends App {

  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.93:9092")
    p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
    p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass)
    p
  }
  val builder: StreamsBuilder = new StreamsBuilder()
  val sourceStream: KStream[String, String] = builder.stream[String, String]("wordCountInput")

  val wordCounts: KTable[String, String] = sourceStream
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
    .groupBy((key, word) => word)
    .count()
    .mapValues((key, value) => value.toString)

  wordCounts.toStream.to("wordCountOutput")

  sourceStream.to("sinkWordCount")

  val stream: KafkaStreams = new KafkaStreams(builder.build(), props)

  sys.ShutdownHookThread {
    stream.close()
  }

  stream.cleanUp()
  stream.start()
}
Run Code Online (Sandbox Code Playgroud)

Scala SBT:

name := "KafkaStreamExample"

version := "0.1"

scalaVersion := "2.12.6"

libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.0.0"
libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1" artifacts( Artifact("javax.ws.rs-api", "jar", "jar"))
Run Code Online (Sandbox Code Playgroud)

卡夫卡版本: kafka_2.11-2.0.0

所以任何人都可以告诉我为什么scala示例的执行速度不如java示例那么快.

更新

cache.max.bytes.buffering在Scala代码中将此属性设置为0可以在Scala中提供WordCount的即时输出,但是根据这个Stackoverflow的答案也不建议在生产中使用 java代码中我们不需要设置此属性但是java仍然快速产量