Hem*_*gle 6 java scala apache-kafka apache-kafka-streams
我正在探索Kafka Streams API并在Scala和Java中实现示例WordCountExample,但是当我运行Scala代码时,与Java示例相比,它提供的输出速度非常慢.
另外我观察到当我在wordCountInput主题上创建记录时,它立即出现在sinkWordCount主题中,但是如果我们运行Scala代码但是如果我们运行Java代码,那么wordCount的结果不会立即显示在wordCountOutput主题中,但是如果我们运行Java代码那么一切正常,我可以看到wordCountOutput主题中的输出立即生效.
Java代码:
public class WordCountExample {
public static void main(String[] args) {
Properties p = new Properties();
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.93:9092");
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("wordCountInput");
KTable<String, String> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\+W")))
.groupBy((key, word) -> word)
.count()
.mapValues((key, value) -> value.toString());
textLines.to("sinkWordCount");
wordCounts.toStream().to("wordCountOutput");
KafkaStreams streams = new KafkaStreams(builder.build(), p);
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
}));
}
}
Run Code Online (Sandbox Code Playgroud)
Java程序的POM文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mycompany</groupId>
<artifactId>KafkaStreamExample</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
</project>
Run Code Online (Sandbox Code Playgroud)
Scala代码:
object KafkaStreamExample extends App {
val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.93:9092")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass)
p
}
val builder: StreamsBuilder = new StreamsBuilder()
val sourceStream: KStream[String, String] = builder.stream[String, String]("wordCountInput")
val wordCounts: KTable[String, String] = sourceStream
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
.groupBy((key, word) => word)
.count()
.mapValues((key, value) => value.toString)
wordCounts.toStream.to("wordCountOutput")
sourceStream.to("sinkWordCount")
val stream: KafkaStreams = new KafkaStreams(builder.build(), props)
sys.ShutdownHookThread {
stream.close()
}
stream.cleanUp()
stream.start()
}
Run Code Online (Sandbox Code Playgroud)
Scala SBT:
name := "KafkaStreamExample"
version := "0.1"
scalaVersion := "2.12.6"
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.0.0"
libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1" artifacts( Artifact("javax.ws.rs-api", "jar", "jar"))
Run Code Online (Sandbox Code Playgroud)
卡夫卡版本: kafka_2.11-2.0.0
所以任何人都可以告诉我为什么scala示例的执行速度不如java示例那么快.
更新
cache.max.bytes.buffering在Scala代码中将此属性设置为0可以在Scala中提供WordCount的即时输出,但是根据这个Stackoverflow的答案也不建议在生产中使用
java代码中我们不需要设置此属性但是java仍然快速产量
| 归档时间: |
|
| 查看次数: |
495 次 |
| 最近记录: |