kafka-python-如何提交分区?

Joh*_*yna 4 python kafka-consumer-api kafka-python

使用kafka-python-1.0.2。

如果我有一个包含10个分区的主题,那么如何遍历各个分区和消息,同时提交一个特定的分区。我似乎无法在文档中或其他任何地方找到此示例

从文档中,我想使用:

consumer.commit(offset=offsets)

具体来说,如何创建偏移量所需的分区和OffsetAndMetadata字典(字典,可选)– {TopicPartition:OffsetAndMetadata}。

我希望函数调用就像这样:

consumer.commit(partition, offset)

但这似乎并非如此。

提前致谢。

小智 6

没有必要使用元数据。看这个例子:

from kafka import TopicPartition
from kafka.structs import OffsetAndMetadata
...
topic = 'your_topic'
partition = 0
tp = TopicPartition(topic,partition)
kafkaConsumer = createKafkaConsumer()
kafkaConsumer.assign([tp])
offset = 15394125
kafkaConsumer.commit({
    tp: OffsetAndMetadata(offset, None)
})
Run Code Online (Sandbox Code Playgroud)

希望这可以帮助。


Joh*_*yna 5

这样看来,我可能已经想通了,很有趣,当您写下问题时会如何发生。这似乎可行:

meta = consumer.partitions_for_topic(topic)
options = {}
options[partition] = OffsetAndMetadata(message.offset + 1, meta)
consumer.commit(options)
Run Code Online (Sandbox Code Playgroud)

需要进行更多测试,但是如果有任何更改,它将进行更新。

  • 这就是这样做的方法,我联系了 GitLab 上的 kafka 团队。回应:“元数据实际上只是一个不透明的字符串。您也可以传递 None。内部没有使用元数据,如果需要,它可以作为您存储特定于应用程序的数据的一种方式。但很少有人真正使用该功能,所以如果你走那条路,要小心。 (2认同)

bux*_*oum 5

from kafka import KafkaConsumer
from kafka import TopicPartition

TOPIC = "test_topic"
PARTITION = 0

consumer = KafkaConsumer(
    group_id=TOPIC,
    auto_offset_reset="earliest",
    bootstrap_servers="localhost:9092",
    request_timeout_ms=100000,
    session_timeout_ms=99000,
    max_poll_records=100,
)
topic_partition = TopicPartition(TOPIC, PARTITION)
# format: topic, partition
consumer.assign([topic_partition])
consumer.seek(topic_partition, 1660000)
# format: TopicPartition, offset. 1660000 is the offset been set.
for message in consumer:
    # do something
Run Code Online (Sandbox Code Playgroud)
  1. 这只分配一个分区并为该分区设置偏移量,如果有多个分区,则需要为每个分区分配一个,然后设置偏移量。
  2. aalmeida88的答案有时对我有用,在某些情况下,它确实有效,aalmeida88给了我寻求的想法,看来这也是一个有用的方法。
  3. 您可能需要注意的另一件事是,当您自己分配分区时,kafka manager似乎无法获取消费者信息,这可能是因为您分配分区时,您在kafka而不是zookeeper中设置它,所以kafka manager可能会没有得到那个信息。希望能帮助到你!

- -编辑 - - -

寻找更好的方法来做到这一点。

topic_partition = TopicPartition(TOPIC,
                                 message.partition)
consumer.seek(topic_partition, offset_value)
consumer.commit()
Run Code Online (Sandbox Code Playgroud)

这将从kafka获取的消息中提取分区信息,并保存手动分配分区的子句,从而在程序中需要设置多个分区的偏移量(并不罕见)时带来方便。

ps:为了确保一个分区只设置一次,应该根据您的应用程序设置一个标志。