Eli*_*PDC 5 java apache-kafka apache-spark spark-streaming
我正在尝试使用 Kafka 执行 Spark 流作业,但是在使用 Eclipse 执行我的课程时遇到了问题
在我的主类“JavaDirectKafkaWordCount.class”中,我用我的 kafka 参数创建了我的 JavaInputDStream,我正在尝试计算从 kafka 主题中读取的单词数
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
// Get the lines, split them into words, count the words and print
JavaDStream<String> lines = messages.map(ConsumerRecord::value);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
lines.print();
// Start the computation
jssc.start();
jssc.awaitTermination();
}
Run Code Online (Sandbox Code Playgroud)
我收到这个错误
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
// Get the lines, split them into words, count the words and print
JavaDStream<String> lines = messages.map(ConsumerRecord::value);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
lines.print();
// Start the computation
jssc.start();
jssc.awaitTermination();
}
Run Code Online (Sandbox Code Playgroud)
我怎么解决这个问题?
小智 6
改变这一行:
JavaDStream<String> lines = messages.map(ConsumerRecord::value);
Run Code Online (Sandbox Code Playgroud)
到
JavaDStream<String> lines = messages.map(x -> x.value());
Run Code Online (Sandbox Code Playgroud)
构建一个包含所有依赖项的 Uber jar,下面是 Spark 2.2.0 的 pom.xml,如果您有不同的版本,请spark.version
相应地更改属性。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.venk.exercise</groupId>
<artifactId>test_exercise</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<spark.version>2.2.0</spark.version>
<spark.kafka.version>2.2.0</spark.kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
Run Code Online (Sandbox Code Playgroud)
更改 pom.xml 后,运行命令mvn clean compile assembly:single
,然后使用以下命令提交作业
bin/spark-submit --class edu.hw.test.SparkStreamingKafkaConsumer jar/test_exercise-0.0.1-SNAPSHOT-jar-with-dependencies.jar <application-arguments>
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
2343 次 |
最近记录: |