Joh*_*yna 4 python kafka-consumer-api kafka-python
使用kafka-python-1.0.2。
如果我有一个包含10个分区的主题,那么如何遍历各个分区和消息,同时提交一个特定的分区。我似乎无法在文档中或其他任何地方找到此示例
从文档中,我想使用:
consumer.commit(offset=offsets)
具体来说,如何创建偏移量所需的分区和OffsetAndMetadata字典(字典,可选)– {TopicPartition:OffsetAndMetadata}。
我希望函数调用就像这样:
consumer.commit(partition, offset)
但这似乎并非如此。
提前致谢。
小智 6
没有必要使用元数据。看这个例子:
from kafka import TopicPartition
from kafka.structs import OffsetAndMetadata
...
topic = 'your_topic'
partition = 0
tp = TopicPartition(topic,partition)
kafkaConsumer = createKafkaConsumer()
kafkaConsumer.assign([tp])
offset = 15394125
kafkaConsumer.commit({
tp: OffsetAndMetadata(offset, None)
})
Run Code Online (Sandbox Code Playgroud)
希望这可以帮助。
这样看来,我可能已经想通了,很有趣,当您写下问题时会如何发生。这似乎可行:
meta = consumer.partitions_for_topic(topic)
options = {}
options[partition] = OffsetAndMetadata(message.offset + 1, meta)
consumer.commit(options)
Run Code Online (Sandbox Code Playgroud)
需要进行更多测试,但是如果有任何更改,它将进行更新。
from kafka import KafkaConsumer
from kafka import TopicPartition
TOPIC = "test_topic"
PARTITION = 0
consumer = KafkaConsumer(
group_id=TOPIC,
auto_offset_reset="earliest",
bootstrap_servers="localhost:9092",
request_timeout_ms=100000,
session_timeout_ms=99000,
max_poll_records=100,
)
topic_partition = TopicPartition(TOPIC, PARTITION)
# format: topic, partition
consumer.assign([topic_partition])
consumer.seek(topic_partition, 1660000)
# format: TopicPartition, offset. 1660000 is the offset been set.
for message in consumer:
# do something
Run Code Online (Sandbox Code Playgroud)
- -编辑 - - -
寻找更好的方法来做到这一点。
topic_partition = TopicPartition(TOPIC,
message.partition)
consumer.seek(topic_partition, offset_value)
consumer.commit()
Run Code Online (Sandbox Code Playgroud)
这将从kafka获取的消息中提取分区信息,并保存手动分配分区的子句,从而在程序中需要设置多个分区的偏移量(并不罕见)时带来方便。
ps:为了确保一个分区只设置一次,应该根据您的应用程序设置一个标志。
| 归档时间: |
|
| 查看次数: |
7815 次 |
| 最近记录: |