Tim*_*son 6 java apache-kafka apache-flink
我一直致力于更新从 Kafka 读取然后写入 Kafka 的 Flink 处理器(Flink 1.9 版)。我们已经编写了这个处理器来运行 Kafka 0.10.2 集群,现在我们已经部署了一个运行 2.2 版的新 Kafka 集群。因此,我开始更新处理器以使用最新的 FlinkKafkaConsumer 和 FlinkKafkaProducer(按照 Flink 文档的建议)。但是,我遇到了 Kafka 制作人的一些问题。我无法使用不推荐使用的构造函数来序列化数据(并不奇怪),而且我无法在网上找到任何关于如何实现序列化程序的实现或示例(所有示例都使用较旧的 Kafka 连接器)
当前的实现(对于Kafka 0.10.2)如下
FlinkKafkaProducer010<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer010<String>(
"playerSessions",
new SimpleStringSchema(),
producerProps,
(FlinkKafkaPartitioner) null
);
Run Code Online (Sandbox Code Playgroud)
尝试实现以下 FlinkKafkaProducer 时
FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
"playerSessions",
new SimpleStringSchema(),
producerProps,
null
);
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:525)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:483)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:357)
at com.ebs.flink.sessionprocessor.SessionProcessor.main(SessionProcessor.java:122)
Run Code Online (Sandbox Code Playgroud)
我一直无法弄清楚为什么。FlinkKafkaProducer 的构造函数也已弃用,当我尝试实现未弃用的构造函数时,我无法弄清楚如何序列化数据。以下是它的外观:
FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
"playerSessions",
new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
return null;
}
},
producerProps,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);
Run Code Online (Sandbox Code Playgroud)
但是我不明白如何实现 KafkaSerializationSchema 并且我在网上或 Flink 文档中找不到任何示例。
有没有人有任何实现这个的经验或关于为什么 FlinkProducer 在步骤中得到 NullPointerException 的任何提示?
Zee*_*han 10
如果您只是将字符串发送到 Kafka:
public class ProducerStringSerializationSchema implements KafkaSerializationSchema<String>{
private String topic;
public ProducerStringSerializationSchema(String topic) {
super();
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8));
}
}
Run Code Online (Sandbox Code Playgroud)
发送 Java 对象:
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ObjSerializationSchema implements KafkaSerializationSchema<MyPojo>{
private String topic;
private ObjectMapper mapper;
public ObjSerializationSchema(String topic) {
super();
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(MyPojo obj, Long timestamp) {
byte[] b = null;
if (mapper == null) {
mapper = new ObjectMapper();
}
try {
b= mapper.writeValueAsBytes(obj);
} catch (JsonProcessingException e) {
// TODO
}
return new ProducerRecord<byte[], byte[]>(topic, b);
}
}
Run Code Online (Sandbox Code Playgroud)
在你的代码中
.addSink(new FlinkKafkaProducer<>(producerTopic, new ObjSerializationSchema(producerTopic),
params.getProperties(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3489 次 |
| 最近记录: |