小编Suc*_*mar的帖子

Flink 应用程序 ClassCastException

我有一个 flink 应用程序,它从 kafka 读取并将其发送到 kafka。

当我从 Intellij IDEA 运行应用程序时,应用程序运行没有问题,但当我将 ShadowJar 提交到 flink 集群时,会出现 ClassCastException。我可以得到一些帮助来弄清楚我在这里做错了什么吗?

异常跟踪:

Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy of type org.apache.kafka.clients.consumer.OffsetResetStrategy in instance of org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer
    at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2205)
    at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2168)
    at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1422)
    at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2517)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2424)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2233)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2501)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2395)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2233)

Run Code Online (Sandbox Code Playgroud)

使用的代码:

KafkaSource<String> source = KafkaSource.<String>builder()
                    .setBootstrapServers("localhost:9092")
                    .setTopics("topic")
                    .setGroupId("grp")
                    .setStartingOffsets(OffsetsInitializer.earliest())
                    .setValueOnlyDeserializer(new SimpleStringSchema())
                    .build();


            DataStream<String> eventStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
                    .name("event-stream").sinkTo("kafka");
Run Code Online (Sandbox Code Playgroud)

构建文件:flinkVersion = 1.15.0

 //flinkShadowJar "org.apache.flink:flink-connector-kafka:${flinkVersion}"

    // …
Run Code Online (Sandbox Code Playgroud)

java apache-flink

5
推荐指数
1
解决办法
1894
查看次数

标签 统计

apache-flink ×1

java ×1