Fra*_*sco 6 apache-kafka apache-kafka-streams
我正在使用Apache Kafka,我一直在尝试使用Kafka Streams功能.我想要实现的是非常简单的,至少在单词中,它可以通过常规的普通消费者/生产者方法轻松实现:
最初我认为我可以创建一个自定义接收器或注入某种端点解析器,以便以编程方式为每条消息定义主题名称,尽管最终找不到任何方法.所以我深入研究代码并找到了ProducerInterceptor类(引自JavaDoc):
一个插件接口,允许您在生成器发布到Kafka集群之前拦截(并可能改变)生成器接收的记录.
它是onSend方法:
这是从KafkaProducer.send(ProducerRecord)和KafkaProducer.send(ProducerRecord,Callback)方法调用之前,键和值得到序列化并分配了分区(如果未在ProducerRecord中指定分区).
这对我来说似乎是完美的解决方案,因为我可以有效地返回一个新的ProducerRecord,其中包含我想要的主题名称.虽然显然存在一个错误(我在他们的JIRA上打开了一个问题:KAFKA-4691),并且当键和值已经被序列化时调用该方法.我认为此时不进行额外的反序列化是可以接受的.
我向你提出的经验丰富,知识渊博的用户问题将是你的意见和建议,以及关于如何有效和优雅地实现它的任何建议.
在此先感谢您的帮助/意见/建议/想法.
以下是我尝试过的一些代码片段:
public static void main(String[] args) throws Exception {
StreamsConfig streamingConfig = new StreamsConfig(getProperties());
StringDeserializer stringDeserializer = new StringDeserializer();
StringSerializer stringSerializer = new StringSerializer();
MyObjectSerializer myObjectSerializer = new MyObjectSerializer();
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.addSource("SOURCE", stringDeserializer, myObjectSerializer, Pattern.compile("input-.*"));
.addProcessor("PROCESS", MyCustomProcessor::new, "SOURCE");
System.out.println("Starting PurchaseProcessor Example");
KafkaStreams streaming = new KafkaStreams(topologyBuilder, streamingConfig);
streaming.start();
System.out.println("Now started PurchaseProcessor Example");
}
private static Properties getProperties() {
Properties props = new Properties();
.....
.....
props.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG), "com.test.kafka.streams.OutputTopicRouterInterceptor");
return props;
}
Run Code Online (Sandbox Code Playgroud)
OutputTopicRouterInterceptor onSend实现:
@Override
public ProducerRecord<String, MyObject> onSend(ProducerRecord<String, MyObject> record) {
MyObject obj = record.value();
String topic = computeTopicName(obj);
ProducerRecord<String, MyObject> newRecord = new ProducerRecord<String, MyObject>(topic, record.partition(), record.timestamp(), record.key(), obj);
return newRecord;
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1627 次 |
| 最近记录: |