Kafka Stream - CommitFailedException:由于该组已经重新平衡并将分区分配给另一个成员,因此无法完成提交

Nan*_*dia 6 apache-kafka

我正在运行一个Kafka Stream应用程序,该应用程序使用来自2个主题的数据并将加入/合并的结果输出到3个主题中.kafka主题有15个分区和3个复制因子.我们有5个卡夫卡经纪人和5个动物园管理员.我正在运行15个Kafka Stream应用程序实例,因此每个应用程序可以有1个分区.Kafka版本 - 0.11.0.0

我在我的kafka流应用程序中得到以下异常:

org.apache.kafka.clients.consumer.CommitFailedException:由于该组已经重新平衡并将分区分配给另一个成员,因此无法完成提交.这意味着后续调用poll()的时间长于配置的max.poll.interval.ms,这通常意味着轮询循环花费了太多时间进行消息处理.您可以通过增加会话超时或通过max.poll.records减少poll()中返回的批量的最大大小来解决此问题.在org.apache的org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)org.apache.kacheka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604)org.apache .kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173)位于org.apache.kafka.streams.processor的org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307) .internals.StreamTask.access $ 000(StreamTask.java:49)位于org.apache.kafka.streams.processor.internals的org.apache.kafka.streams.processor.internals.StreamTask $ 1.run(StreamTask.java:268) .StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)org.apache.kafka.streams. (StreamTask.java:362)org.apache.kafka.stream.process.Stat..processor.internals.StreamThread $ 3.apply(StreamThread.java:1118)位于org.apache.kafka.streams.processor的org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448). ingals.StreamThread.suspendTasksAndState(StreamThread.java:1110)atg.apache.kafka.streams.processor.internals.StreamThread.access $ 1800(StreamThread.java:73)org.apache.kafka.streams.processor.internals.StreamThread $ RebalanceListener.onPartitionsRevoked(StreamThread.java:218)org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:422)org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded (AbstractCoordinator.java:353)org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java) :297)org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(Ka)fkaConsumer.java:1078)org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)2017-08- 09 14:50:49 - [ERROR] [click-live-StreamThread-1] [org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks:1453]:

有人可以帮助解决原因和解决方案吗?

此外,当我的一个kafka经纪人失败时,我的kafka流应用程序没有连接到其他经纪人?我已经设定brokers.list=broker1:9092,broker2:9092,broker3:9092,broker4:9092,broker5:9092

Den*_*din 0

根据信息,这是最有可能的解决方案路线:

尝试按照消息中的建议进行操作:

“您可以通过增加会话超时或使用 max.poll.records 减少 poll() 中返回的批次的最大大小来解决此问题”