Nik*_*las 2 java spring publish-subscribe apache-kafka spring-boot
我有两个消费者组first并使用 Spring Boot 版本2.1.8second订阅了主题。我有 2 项服务使用每个单独的消费者组。my-topic
使用最小的设置我定义了一个监听器:
@KafkaListener(topics = "my-topic")
public void consume(@Payload String message) {
logger.info(String.format("#### -> Consumed message -> %s", message));
}
Run Code Online (Sandbox Code Playgroud)
使用的列表kafka-consumer-groups显示以下内容:
root@8d49c1b2c3bf:/# kafka-consumer-groups --bootstrap-server localhost:9092 --describe --all-groups
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
first my-topic 2 0 0 0 consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9 consumer-2
first my-topic 0 0 0 0 consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9 consumer-2
first my-topic 1 0 0 0 consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9 consumer-2
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
second my-topic 0 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2
second my-topic 1 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2
second my-topic 2 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2
Run Code Online (Sandbox Code Playgroud)
我希望组中的消费者first只使用0和2个分区,所以我尝试用监听器来实现:
@KafkaListener(topicPartitions = {
@TopicPartition(
topic = "my-topic",
partitions = { "0", "2" }
)}
)
public void consume(@Payload String message) {
logger.info(String.format("#### -> Consumed message -> %s", message));
}
Run Code Online (Sandbox Code Playgroud)
为什么使用相同命令的列表与上面显示的内容不同,除了first组的分区 1 之外?此外,CONSUMER-ID值和进一步为空,并且根本不@KafkaListener接收该组的任何消息(组中的侦听器工作原理相同)。如何修复它?firstsecond
Consumer group 'first' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
first my-topic 2 0 0 0 - - -
first my-topic 0 0 0 0 - - -
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
second my-topic 0 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2
second my-topic 1 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2
second my-topic 2 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2
Run Code Online (Sandbox Code Playgroud)
手动分配所有分区由于您将消费者线程分配给特定分区,Kafka 将使用assign()方法而不使用组协调。每个消费者独立行动,即使它与另一个消费者共享一个groupId
\n\n\n假设\xe2\x80\x99s 表示您希望始终从所有分区读取所有记录(例如,当使用压缩主题加载分布式缓存时),手动分配分区而不使用 Kafka\xe2\x80 可能会很有用\x99s 群组管理。
\n
但检查消费者位置的命令只会显示使用组协调的消费者的位置
\n\n\n\n\n\n\n\n\n有时了解消费者的立场很有用。我们有一个工具可以显示消费者组中所有消费者的位置以及它们距离日志末尾有多远。在使用名为 my-topic 的主题的名为 my-group 的消费者组上运行此工具
\n
\n\n\n手动分区分配不使用组协调,因此消费者故障不会导致分配的分区重新平衡。每个消费者独立行动,即使它与另一个消费者共享一个groupId。为了避免偏移提交冲突,您通常应该确保每个消费者实例的 groupId 是唯一的。
\n
侦听器不消费消息的最后一个原因是因为偏移量,默认情况下偏移量设置为最新,您需要特别指定偏移位置,如下所示
\n\n@KafkaListener(id = "thing2", topicPartitions =\n { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),\n @TopicPartition(topic = "topic2", partitions = "0",\n partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))\n })\n public void listen(ConsumerRecord<?, ?> record) {\n ...\n }\nRun Code Online (Sandbox Code Playgroud)\n\n有关偏移量的更多信息
\n\n\n\n\n第一个构造函数采用 TopicPartitionOffset 参数数组来显式指示容器要使用哪些分区(使用使用者的 allocate() 方法)并带有可选的初始偏移量。默认情况下,正值是绝对偏移量。默认情况下,负值是相对于分区内当前最后一个偏移量的。提供了带有附加布尔参数的 TopicPartitionOffset 构造函数。如果这是真的,则初始偏移(正或负)相对于该消费者的当前位置。偏移量在容器启动时应用。
\n
注意:您可以在partitions 或partitionOffsets 属性中指定每个分区,但不能同时指定两者。
\n| 归档时间: |
|
| 查看次数: |
4457 次 |
| 最近记录: |