Kafka Streams 输出主题可以在单独的集群上吗?

use*_*024 2 apache-kafka apache-kafka-streams kafka-topic

我有一个主题,其中所有日志都推送到集中主题,但如果可能,我想将其中一些记录过滤到单独的主题和集群。

谢谢

Vas*_*kyi 5

Kafka 流不允许使用来自不同 Kafka 集群的源和输出主题创建流。所以下面的代码对你不起作用

streamsBuilder.stream(sourceTopicName).filter(..).to(outputTopicName)
Run Code Online (Sandbox Code Playgroud)

在这种情况下,它期望 outputTopicName 来自与主题 sourceTopicName 相同的集群。

作为一种解决方法,为了将消息从另一个集群发送到输出主题,您可以使用额外创建的 KafkaProducer,其属性bootstrap.servers将指向外部集群和KStream.foreach()方法

streamsBuilder.stream(sourceTopicName)
    .filter((key, value) -> ..)
    .foreach((key, value) -> 
        sendMessage(kafkaProducerFromAnotherCluster, destinationTopicName, key, value);


public static void sendMessage(KafkaProducer<String, String> kafkaProducer, 
                               String destinationTopicName, String key, String value) {
    try {
        kafkaProducer.send(new ProducerRecord(destinationTopicName, key, value));
    } catch (RuntimeException ex) {
        log.error(errorMessage, ex);
    }
}
Run Code Online (Sandbox Code Playgroud)

另一种选择是在您的 Kafka 集群中创建输出主题,该主题将过滤消息并在两个集群之间设置Kafka 镜像(因此消息将从一个主题复制到另一个集群的第二个主题)。