use*_*024 2 apache-kafka apache-kafka-streams kafka-topic
我有一个主题,其中所有日志都推送到集中主题,但如果可能,我想将其中一些记录过滤到单独的主题和集群。
谢谢
Kafka 流不允许使用来自不同 Kafka 集群的源和输出主题创建流。所以下面的代码对你不起作用
streamsBuilder.stream(sourceTopicName).filter(..).to(outputTopicName)
Run Code Online (Sandbox Code Playgroud)
在这种情况下,它期望 outputTopicName 来自与主题 sourceTopicName 相同的集群。
作为一种解决方法,为了将消息从另一个集群发送到输出主题,您可以使用额外创建的 KafkaProducer,其属性bootstrap.servers将指向外部集群和KStream.foreach()方法。
streamsBuilder.stream(sourceTopicName)
.filter((key, value) -> ..)
.foreach((key, value) ->
sendMessage(kafkaProducerFromAnotherCluster, destinationTopicName, key, value);
public static void sendMessage(KafkaProducer<String, String> kafkaProducer,
String destinationTopicName, String key, String value) {
try {
kafkaProducer.send(new ProducerRecord(destinationTopicName, key, value));
} catch (RuntimeException ex) {
log.error(errorMessage, ex);
}
}
Run Code Online (Sandbox Code Playgroud)
另一种选择是在您的 Kafka 集群中创建输出主题,该主题将过滤消息并在两个集群之间设置Kafka 镜像(因此消息将从一个主题复制到另一个集群的第二个主题)。
| 归档时间: |
|
| 查看次数: |
1552 次 |
| 最近记录: |