Vit*_*lar 2 apache-kafka spring-boot spring-kafka
我正在使用 Spring Boot 开始使用 Apache Kafka。我想实现以下事实:同一服务的两个实例以循环方式消耗同一主题的消息。
因此,实例 1 接收主题的第一条消息,实例 2 接收第二条消息,实例 1 接收第三条消息,依此类推。
这是我当前的配置:
spring:
kafka:
consumer:
bootstrap-servers: rommel.local:9092
group-id: core
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false
producer:
bootstrap-servers: rommel.local:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
listener:
missing-topics-fatal: false
ack-mode: record
Run Code Online (Sandbox Code Playgroud)
这是我的听众:
@KafkaListener(topics = "new-topic", groupId = "core")
public void consume(String payload) {
System.out.println("Data received: " + payload);
}
Run Code Online (Sandbox Code Playgroud)
问题是,当我运行应用程序的两个实例时,只有一个实例收到该主题的消息。
我在日志中意识到了这一点:
Instance 1:
core: partitions assigned: [new-topic-0]
Run Code Online (Sandbox Code Playgroud)
Instance 2:
core: partitions assigned: []
Run Code Online (Sandbox Code Playgroud)
那么当我杀死实例1时。被new-topic-0分配给实例 2。
那么如何实现两个实例循环获取同一主题消息的场景呢?
谢谢!!
首先,让我们弄清楚一些事情:
在Kafka中,并行的基本单位是分区。如果您的主题只有一个分区,则无论您配置多少个消费者实例/线程,只有其中一个会收到消息 - 其他实例将保持空闲状态。如果当前处理消息的实例出现故障,则分区将通过称为“重新平衡”的机制分配给其他消费者实例/线程,然后该实例将开始处理消息。这就是您在这里观察到的行为。
严格来说,即使有多个分区,也不会是循环。分区将均匀分布在消费者线程/进程之间。因此,每个消费者实例/线程都会有一个或多个专门分配给它的分区,并且它只会处理这些分区中的消息。
当您向主题生成消息时,消息将到达的位置取决于您将使用的分区键。这意味着具有相同分区键的消息将始终位于同一分区中(除非您在一段时间后增加分区数量)。这也意味着消息的顺序将在给定分区内保持,而不是跨分区。
在旧版本的 Kafka 中,如果在生成消息时没有分区键,则消息可能会跨分区以循环方式生成。然而,在最新版本的 Kafka 中,即使没有分区键,消息也会根据生产者配置的参数进行批处理batch.size,然后linger.ms再发送出去。
| 归档时间: |
|
| 查看次数: |
8130 次 |
| 最近记录: |