Art*_*are 6 python apache-kafka kafka-consumer-api confluent-platform confluent-kafka-python
到目前为止,这是我尝试过的:
from confluent_kafka import Consumer
c = Consumer({... several security/server settings skipped...
'auto.offset.reset': 'beginning',
'group.id': 'my-group'})
c.subscribe(['my.topic'])
msg = poll(30.0) # msg is of None type.
Run Code Online (Sandbox Code Playgroud)
msg
几乎总是最终成为None
这样。我认为问题可能是'my-group'
已经消耗了所有消息'my.topic'
......但我不在乎消息是否已经被消耗 - 我仍然需要最新的消息。具体来说,我需要最新消息的时间戳。
我又尝试了一些,从这里看来,该主题中可能有 25 条消息,但我不知道如何获取它们:
a = c.assignment()
print(a) # Outputs [TopicPartition{topic=my.topic,partition=0,offset=-1001,error=None}]
offsets = c.get_watermark_offsets(a[0])
print(offsets) # Outputs: (25, 25)
Run Code Online (Sandbox Code Playgroud)
如果因为该主题从未写入任何内容而没有消息,我该如何确定?如果是这样,我如何确定该主题存在了多长时间?我正在编写一个脚本,自动删除过去 X 天内未写入的任何主题(最初为 14 个 - 可能会随着时间的推移进行调整。)
小智 9
我遇到了同样的问题,没有这方面的例子。就我而言,有一个分区,我需要读取最后一条消息,以了解该消息中的一些信息来设置我拥有的消费者/生产者组件。
逻辑是开始Consumer
,订阅主题,轮询消息 -> 触发on_assign
,通过将修改后的分区分配回来,发生倒带。完成后on_assign
,轮询msg
继续并读取主题中的最后一条消息。
settings = {
"bootstrap.servers": "my.kafka.server",
"group.id": "my-work-group",
"client.id": "my-work-client-1",
"enable.auto.commit": False,
"session.timeout.ms": 6000,
"default.topic.config": {"auto.offset.reset": "largest"},
}
consumer = Consumer(settings)
def on_assign(a_consumer, partitions):
# get offset tuple from the first partition
last_offset = a_consumer.get_watermark_offsets(partitions[0])
# position [1] being the last index
partitions[0].offset = last_offset[1] - 1
consumer.assign(partitions)
consumer.subscribe(["test-topic"], on_assign=on_assign)
msg = consumer.poll(6.0)
Run Code Online (Sandbox Code Playgroud)
现在msg
里面有最后一条消息。
归档时间: |
|
查看次数: |
6289 次 |
最近记录: |