max.poll.intervals.ms默认设置为int.Max

Jav*_*era 5 apache-kafka apache-kafka-streams

Apache Kafka文档说明:

内部Kafka Streams使用者max.poll.interval.ms默认值已从300000更改为Integer.MAX_VALUE

由于此值用于检测一批记录的处理时间何时超过给定阈值,是否存在这种"无限"值的原因?

它是否使应用程序无响应?或者,当处理时间过长时,Kafka Streams有不同的方式离开消费者群体?

Mic*_*oll 9

它是否使应用程序无响应?或者,当处理时间过长时,Kafka Streams有不同的方式离开消费者群体?

Kafka Streams在此上下文中利用了Kafka消费者客户端的心跳功能,因此从调用中解除了心跳("此应用程序实例是否还活着吗?")poll().两个主要参数是session.timeout.ms(对于心跳线程)和max.poll.interval.ms(对于处理线程),它们的区别在/sf/answers/2783153061/中有更详细的描述.

引入了心跳,以便允许应用程序实例花费大量时间处理记录而不被视为"没有进展"并因此"死亡".例如,你的应用程序可以对一分钟的单个记录进行大量的处理,同时仍然心跳到Kafka"嘿,我还活着,我正在取得进展.但我还没完成处理. 敬请关注."

当然,如果您确实希望您的应用程序实例在轮询记录之间花费超过X秒的时间,那么您可以将max.poll.interval.ms其默认值(Integer.MAX_VALUE)更改为更低的设置,因此如果需要超过X秒处理最新一轮记录.这取决于您的具体用例,这种配置是否有意义 - 在大多数情况下,默认设置是一个安全的赌注.

session.timeout.ms:使用Kafka的组管理工具时用于检测消费者故障的超时.消费者定期发送心跳以指示其对经纪人的活跃性.如果在此会话超时到期之前代理没有收到心跳,则代理将从该组中删除此使用者并启动重新平衡.请注意,该值必须在group.min.session.timeout.ms和group.max.session.timeout.ms的代理配置中配置的允许范围内.

max.poll.interval.ms:使用使用者组管理时poll()调用之间的最大延迟.这为消费者在获取更多记录之前可以闲置的时间量设置了上限.如果在此超时到期之前未调用poll(),则认为使用者失败,并且该组将重新平衡以便将分区重新分配给另一个成员.