我不清楚为什么我们需要session.timeout.ms和max.poll.interval.ms以及何时使用一个或另一个或两者?似乎两者都表明时间协调器的上限将等待消费者获得心跳,然后再将其假死.
另外,对于基于KIP-62的 0.10.1.0+版本,它的表现如何?
文档https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html表示"请注意,启用自动提交后,对poll的调用将始终提交由以前的轮询.它不知道实际处理了哪些事件,所以在再次调用poll之前总是处理poll返回的所有事件(或者在调用close()之前,它还将自动提交偏移量).如果是这样的话,如果auto.commit.interval.ms大于处理从前一个接收的消息的时间,它是如何工作的poll()
.
为了使其更具体,请考虑我有以下情况:
enable.auto.commit=true
auto.commit.interval.ms=10
Run Code Online (Sandbox Code Playgroud)
我打电话给poll()
一个循环.
1)在第一次调用时poll()
,我得到1000条消息(偏移2000-3000),处理所有1000条消息需要1毫秒
2)我poll()
再次打电话.在第二次poll()
调用中,它应该提交从前一次返回的最新偏移量3000,poll()
但是由于auto.commit.interval.ms
设置为10 ms,它不会提交偏移量,对吧?
在这种情况下,提交的偏移量将进一步落后于实际处理的最新偏移量?
有人可以澄清/确认吗?
我想增加session.timeout.ms
以允许更长的时间来处理在poll()
呼叫之间接收的消息.但是,当我更改session.timeout.ms
为高于30000的值时,它无法创建Consumer对象并抛出错误.
任何人都可以告诉我为什么不能增加session.timeout.ms
价值或者我错过了什么?
0 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
request.timeout.ms = 40000
check.crcs = true
retry.backoff.ms = 100
ssl.truststore.password = null
ssl.keymanager.algorithm = SunX509
receive.buffer.bytes = 262144
ssl.cipher.suites = null
ssl.key.password = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.provider = null
sasl.kerberos.service.name = null
session.timeout.ms = 40000
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [server-name:9092]
client.id =
fetch.max.wait.ms = 500
fetch.min.bytes = 50000
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
sasl.kerberos.kinit.cmd = /usr/bin/kinit
auto.offset.reset = latest
value.deserializer = class …
Run Code Online (Sandbox Code Playgroud) https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html提到“只要消费者定期发送心跳,就假定它是活着的,并且正在处理消息从它的分区。实际上,轮询消息的行为是导致消费者发送这些心跳的原因。如果消费者停止发送心跳的时间足够长,它的会话将超时,组协调器将认为它已死并触发重新平衡.”
同样https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html指定“代理将通过使用心跳自动检测测试组中的失败进程机制。消费者会定期自动ping集群,让集群知道它是活着的。只要消费者能够做到这一点,它就被认为是活着的,并保留从分配给它的分区中消费的权利。如果它停止心跳的时间比 session.timeout.ms 长,然后它会被认为是死的,它的分区将被分配给另一个进程。”
在我的应用程序中,处理从前一个 poll() 接收的消息可能需要长达数小时才能调用另一个 poll()。注意:我禁用了自动提交,因为我并不总是知道处理所有以前的消息需要多长时间。
a) 这会导致组协调员认为消费者已死亡或不活动吗?
b) 是否有其他方法可以向组协调器发送心跳消息以保持会话处于活动状态?
c) session.timeout.ms 对保持消费者存活/活跃有什么影响吗?
我正在使用Kafka 0.9
消费者API.我需要检查给定使用者组和主题的消费者的当前偏移量,并且没有找到任何列出此信息的命令行工具.我尝试了以下命令
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group <group_name>
Run Code Online (Sandbox Code Playgroud)
但这并未列出消费者,即使我有消费者在运行/投票.
基于Kafka 0.9新的消费者api ---如何仅仅观看消费者抵消似乎有这方面的命令行工具,但我不知道这是否适合Kafka 0.9
消费者.
任何帮助,将不胜感激.