小编Tim*_*son的帖子

如何为 Kafka 2.2 实现 FlinkKafkaProducer 序列化器

我一直致力于更新从 Kafka 读取然后写入 Kafka 的 Flink 处理器(Flink 1.9 版)。我们已经编写了这个处理器来运行 Kafka 0.10.2 集群,现在我们已经部署了一个运行 2.2 版的新 Kafka 集群。因此,我开始更新处理器以使用最新的 FlinkKafkaConsumer 和 FlinkKafkaProducer(按照 Flink 文档的建议)。但是,我遇到了 Kafka 制作人的一些问题。我无法使用不推荐使用的构造函数来序列化数据(并不奇怪),而且我无法在网上找到任何关于如何实现序列化程序的实现或示例(所有示例都使用较旧的 Kafka 连接器)

当前的实现(对于Kafka 0.10.2)如下

FlinkKafkaProducer010<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer010<String>(
                "playerSessions",
                new SimpleStringSchema(),
                producerProps,
                (FlinkKafkaPartitioner) null
        );
Run Code Online (Sandbox Code Playgroud)

尝试实现以下 FlinkKafkaProducer 时

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
                "playerSessions",
                new SimpleStringSchema(),
                producerProps,
                null
        );
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:525)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:483)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:357)
    at com.ebs.flink.sessionprocessor.SessionProcessor.main(SessionProcessor.java:122)
Run Code Online (Sandbox Code Playgroud)

我一直无法弄清楚为什么。FlinkKafkaProducer 的构造函数也已弃用,当我尝试实现未弃用的构造函数时,我无法弄清楚如何序列化数据。以下是它的外观:

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
                "playerSessions",
                new KafkaSerializationSchema<String>() {
                    @Override
                    public …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-flink

6
推荐指数
1
解决办法
3489
查看次数

标签 统计

apache-flink ×1

apache-kafka ×1

java ×1