java.lang.IllegalArgumentException:无效的 lambda 反序列化

Eli*_*PDC 5 java apache-kafka apache-spark spark-streaming

我正在尝试使用 Kafka 执行 Spark 流作业,但是在使用 Eclipse 执行我的课程时遇到了问题

在我的主类“JavaDirectKafkaWordCount.class”中,我用我的 kafka 参数创建了我的 JavaInputDStream,我正在尝试计算从 kafka 主题中读取的单词数

    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
        jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

    // Get the lines, split them into words, count the words and print
    JavaDStream<String> lines = messages.map(ConsumerRecord::value);
    JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
        .reduceByKey((i1, i2) -> i1 + i2);
lines.print();
    // Start the computation
    jssc.start();
    jssc.awaitTermination();
  }
Run Code Online (Sandbox Code Playgroud)

我收到这个错误

    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
        jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

    // Get the lines, split them into words, count the words and print
    JavaDStream<String> lines = messages.map(ConsumerRecord::value);
    JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
        .reduceByKey((i1, i2) -> i1 + i2);
lines.print();
    // Start the computation
    jssc.start();
    jssc.awaitTermination();
  }
Run Code Online (Sandbox Code Playgroud)

我怎么解决这个问题?

小智 6

改变这一行:

JavaDStream<String> lines = messages.map(ConsumerRecord::value);
Run Code Online (Sandbox Code Playgroud)

JavaDStream<String> lines = messages.map(x -> x.value());
Run Code Online (Sandbox Code Playgroud)

  • 添加一些描述 (3认同)

ven*_*ata 1

构建一个包含所有依赖项的 Uber jar,下面是 Spark 2.2.0 的 pom.xml,如果您有不同的版本,请spark.version相应地更改属性。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.venk.exercise</groupId>
    <artifactId>test_exercise</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <properties>
        <spark.version>2.2.0</spark.version>
        <spark.kafka.version>2.2.0</spark.kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.1.0</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>

    </build>
</project>
Run Code Online (Sandbox Code Playgroud)

更改 pom.xml 后,运行命令mvn clean compile assembly:single,然后使用以下命令提交作业

bin/spark-submit  --class edu.hw.test.SparkStreamingKafkaConsumer jar/test_exercise-0.0.1-SNAPSHOT-jar-with-dependencies.jar <application-arguments>
Run Code Online (Sandbox Code Playgroud)