我一直致力于更新从 Kafka 读取然后写入 Kafka 的 Flink 处理器(Flink 1.9 版)。我们已经编写了这个处理器来运行 Kafka 0.10.2 集群,现在我们已经部署了一个运行 2.2 版的新 Kafka 集群。因此,我开始更新处理器以使用最新的 FlinkKafkaConsumer 和 FlinkKafkaProducer(按照 Flink 文档的建议)。但是,我遇到了 Kafka 制作人的一些问题。我无法使用不推荐使用的构造函数来序列化数据(并不奇怪),而且我无法在网上找到任何关于如何实现序列化程序的实现或示例(所有示例都使用较旧的 Kafka 连接器)
当前的实现(对于Kafka 0.10.2)如下
FlinkKafkaProducer010<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer010<String>(
"playerSessions",
new SimpleStringSchema(),
producerProps,
(FlinkKafkaPartitioner) null
);
Run Code Online (Sandbox Code Playgroud)
尝试实现以下 FlinkKafkaProducer 时
FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
"playerSessions",
new SimpleStringSchema(),
producerProps,
null
);
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:525)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:483)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:357)
at com.ebs.flink.sessionprocessor.SessionProcessor.main(SessionProcessor.java:122)
Run Code Online (Sandbox Code Playgroud)
我一直无法弄清楚为什么。FlinkKafkaProducer 的构造函数也已弃用,当我尝试实现未弃用的构造函数时,我无法弄清楚如何序列化数据。以下是它的外观:
FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
"playerSessions",
new KafkaSerializationSchema<String>() {
@Override
public …Run Code Online (Sandbox Code Playgroud)