ca9*_*3d9 1 python apache-kafka kafka-consumer-api kafka-python
我有以下 Kafka 消费者,如果将 分配group_id给 None,它会很好地工作 - 它收到了所有历史消息和我新测试的消息。
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=enable_auto_commit,
group_id=group_id,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for m in consumer:
Run Code Online (Sandbox Code Playgroud)
group_id但是,如果我将其设置为某个值,它不会收到任何内容。我尝试运行测试生产者来发送新消息,但没有收到任何消息。
消费者控制台确实显示以下消息:
2020-11-07 00:56:01 INFO ThreadPoolExecutor-0_0 base.py(重新)加入组 my_group 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 base.py 成功加入第 497 代组 my_group 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 subscription_state.py 更新的分区分配:[] 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 Consumer.py 为组 my_group 设置新分配的分区 set()
一个Topic的一个分区只能被同一个ConsumerGroup内的一个Consumer消费。
如果您不设置group.id,KafkaConsumer将为您生成一个新的随机group.id。由于此 group.id 是唯一的,您将看到数据正在被消耗。
如果您有多个使用相同 group.id 运行的消费者,则只有一个消费者会读取数据,而另一个消费者则保持空闲状态,不消费任何数据。
| 归档时间: |
|
| 查看次数: |
4504 次 |
| 最近记录: |