Kafka Streams动态路由(ProducerInterceptor可能是一个解决方案吗?)

Fra*_*sco 6 apache-kafka apache-kafka-streams

我正在使用Apache Kafka,我一直在尝试使用Kafka Streams功能.我想要实现的是非常简单的,至少在单词中,它可以通过常规的普通消费者/生产者方法轻松实现:

  1. 从动态主题列表中读取a
  2. 对邮件做一些处理
  3. 将消息推送到另一个主题,该主题根据消息内容计算名称

最初我认为我可以创建一个自定义接收器或注入某种端点解析器,以便以编程方式为每条消息定义主题名称,尽管最终找不到任何方法.所以我深入研究代码并找到了ProducerInterceptor类(引自JavaDoc):

一个插件接口,允许您在生成器发布到Kafka集群之前拦截(并可能改变)生成器接收的记录.

它是onSend方法:

这是从KafkaProducer.send(ProducerRecord)和KafkaProducer.send(ProducerRecord,Callback)方法调用之前,键和值得到序列化并分配了分区(如果未在ProducerRecord中指定分区).

这对我来说似乎是完美的解决方案,因为我可以有效地返回一个新的ProducerRecord,其中包含我想要的主题名称.虽然显然存在一个错误(我在他们的JIRA上打开了一个问题:KAFKA-4691),并且当键和值已经被序列化时调用该方法.我认为此时不进行额外的反序列化是可以接受的.

我向你提出的经验丰富,知识渊博的用户问题将是你的意见和建议,以及关于如何有效和优雅地实现它的任何建议.

在此先感谢您的帮助/意见/建议/想法.

以下是我尝试过的一些代码片段:

public static void main(String[] args) throws Exception {

    StreamsConfig streamingConfig = new StreamsConfig(getProperties());

    StringDeserializer stringDeserializer = new StringDeserializer();
    StringSerializer stringSerializer = new StringSerializer();

    MyObjectSerializer myObjectSerializer = new MyObjectSerializer();

    TopologyBuilder topologyBuilder = new TopologyBuilder();
    topologyBuilder.addSource("SOURCE", stringDeserializer, myObjectSerializer, Pattern.compile("input-.*"));

    .addProcessor("PROCESS", MyCustomProcessor::new, "SOURCE");


    System.out.println("Starting PurchaseProcessor Example");
    KafkaStreams streaming = new KafkaStreams(topologyBuilder, streamingConfig);
    streaming.start();
    System.out.println("Now started PurchaseProcessor Example");

}

private static Properties getProperties() {
    Properties props = new Properties();
    .....
    .....
    props.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG), "com.test.kafka.streams.OutputTopicRouterInterceptor");

    return props;
}
Run Code Online (Sandbox Code Playgroud)

OutputTopicRouterInterceptor onSend实现:

@Override
public ProducerRecord<String, MyObject> onSend(ProducerRecord<String, MyObject> record) {
    MyObject obj = record.value();

    String topic = computeTopicName(obj);

    ProducerRecord<String, MyObject> newRecord = new ProducerRecord<String, MyObject>(topic, record.partition(), record.timestamp(), record.key(), obj);
    return newRecord;
}
Run Code Online (Sandbox Code Playgroud)