Hin*_*ing 4 apache-kafka spring-boot spring-kafka
有一个耗时的操作(大约10分钟),但卡夫卡在5分钟后重新平衡,甚至我暂停了消费者.消费者方法:
@KafkaListener(topics = {TopicAppoint.EXECUTE_SCHOOL_DATA_STATICS_TASK})
public void receiveMessage(@Payload String payload, Consumer<String, String> consumer) {
Set<TopicPartition> assignment = consumer.assignment();
consumer.pause(assignment);
if (StringUtils.isNotEmpty(payload)) {
SchoolStatisticsTaskDTO staticsTaskDTO = JSONObject.parseObject(payload, SchoolStatisticsTaskDTO.class);
Optional<SchoolStatisticsTaskDO> taskOptional = schoolStatisticsTaskRepository.findById(staticsTaskDTO.getTrackId());
taskOptional.ifPresent(schoolStaticsTaskDO -> {
// handler
});
}
consumer.resume(assignment);
}
Run Code Online (Sandbox Code Playgroud)
这是我的配置:
kafka:
bootstrap-servers: 192.168.0.230:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 3
properties:
max.request.size: 12582912
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: dc-fitness-data-consumer-group
properties:
max.partition.fetch.bytes: 12582912
#enable-auto-commit: false
listener:
ack-mode: record
concurrency: 6
Run Code Online (Sandbox Code Playgroud)
日志
13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-25, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-25, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.es.records.incremental.update.task-4, ft.es.records.incremental.update.task-5, ft.es.records.incremental.update.task-2, ft.es.records.incremental.update.task-3, ft.es.records.incremental.update.task-0, ft.es.records.incremental.update.task-1]
13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.es.records.incremental.update.task-4, ft.es.records.incremental.update.task-5, ft.es.records.incremental.update.task-2, ft.es.records.incremental.update.task-3, ft.es.records.incremental.update.task-0, ft.es.records.incremental.update.task-1]
13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-25, groupId=dc-fitness-data-consumer-group] (Re-)joining group
13:09:20.220 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-21, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-21, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.student.batch.upload.task-17, ft.student.batch.upload.task-14, ft.student.batch.upload.task-13, ft.student.batch.upload.task-16, ft.student.batch.upload.task-15, ft.student.batch.upload.task-12]
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.student.batch.upload.task-17, ft.student.batch.upload.task-14, ft.student.batch.upload.task-13, ft.student.batch.upload.task-16, ft.student.batch.upload.task-15, ft.student.batch.upload.task-12]
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-21, groupId=dc-fitness-data-consumer-group] (Re-)joining group
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-18, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-18, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.record.batch.upload.task-32, ft.record.batch.upload.task-31, ft.record.batch.upload.task-34, ft.record.batch.upload.task-33, ft.record.batch.upload.task-30, ft.record.batch.upload.task-35]
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.record.batch.upload.task-32, ft.record.batch.upload.task-31, ft.record.batch.upload.task-34, ft.record.batch.upload.task-33, ft.record.batch.upload.task-30, ft.record.batch.upload.task-35]
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-18, groupId=dc-fitness-data-consumer-group] (Re-)joining group
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-4, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.class.batch.upload.task-18, ft.class.batch.upload.task-19, ft.class.batch.upload.task-20, ft.class.batch.upload.task-21, ft.class.batch.upload.task-22, ft.class.batch.upload.task-23]
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-53, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-52, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-5, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.class.batch.upload.task-26, ft.class.batch.upload.task-27, ft.class.batch.upload.task-28, ft.class.batch.upload.task-29, ft.class.batch.upload.task-24, ft.class.batch.upload.task-25]
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.class.batch.upload.task-18, ft.class.batch.upload.task-19, ft.class.batch.upload.task-20, ft.class.batch.upload.task-21, ft.class.batch.upload.task-22, ft.class.batch.upload.task-23]
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-53, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.es.class.incremental.update.task-26, ft.es.class.incremental.update.task-27, ft.es.class.incremental.update.task-28, ft.es.class.incremental.update.task-29, ft.es.class.incremental.update.task-24, ft.es.class.incremental.update.task-25]
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.class.batch.upload.task-26, ft.class.batch.upload.task-27, ft.class.batch.upload.task-28, ft.class.batch.upload.task-29, ft.class.batch.upload.task-24, ft.class.batch.upload.task-25]
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.es.class.incremental.update.task-26, ft.es.class.incremental.update.task-27, ft.es.class.incremental.update.task-28, ft.es.class.incremental.update.task-29, ft.es.class.incremental.update.task-24, ft.es.class.incremental.update.task-25]
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=dc-fitness-data-consumer-group] (Re-)joining group
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=dc-fitness-data-consumer-group] (Re-)joining group
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-53, groupId=dc-fitness-data-consumer-group] (Re-)joining group
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-52, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.es.class.incremental.update.task-22, ft.es.class.incremental.update.task-23, ft.es.class.incremental.update.task-18, ft.es.class.incremental.update.task-19, ft.es.class.incremental.update.task-20, ft.es.class.incremental.update.task-21]
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.es.class.incremental.update.task-22, ft.es.class.incremental.update.task-23, ft.es.class.incremental.update.task-18, ft.es.class.incremental.update.task-19, ft.es.class.incremental.update.task-20, ft.es.class.incremental.update.task-21]
13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#5-4-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-47, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
13:09:20.225 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-52, groupId=dc-fitness-data-consumer-group] (Re-)joining group
Run Code Online (Sandbox Code Playgroud)
让我们看一下文档一段时间,从pause()方法开始
public void pause(集合分区) 在这里
暂停从请求的分区中获取.将来调用poll(long)将不会从这些分区返回任何记录,直到它们使用resume(Collection)恢复.请注意,此方法不会影响分区订阅.特别是,在使用自动分配时,它不会导致组重新平衡.
因此,从上面的描述pause()方法将暂停分区获取消息但它不会暂停消费者线程,这意味着消费者线程将对poll()暂停的分区执行后续请求但不会获取任何记录
在您的应用程序中:分区暂停,但消费者线程忙于执行耗时的操作超过10分钟而不执行任何轮询请求.
检测消费者故障:因此当轮询停止时,心跳不会发送到群集
订阅一组主题后,当调用poll(long)时,使用者将自动加入该组.poll API旨在确保消费者的活力.只要您继续调用poll,消费者将留在该组中并继续从分配的分区接收消息.在封面下,poll API会定期向服务器发送心跳; 当你停止调用poll(可能是因为抛出了异常)时,就不会发送心跳.如果在服务器收到心跳之前经过了配置的会话超时的一段时间,那么消费者将被踢出组,并且将重新分配其分区.
解决方法:增加以下属性超时,因为预设时间为5分钟,你看到的再平衡为每5分钟(个人将不支持增加超时)在这里
新的Java Consumer现在支持后台线程的心跳.
max.poll.interval.ms在消费者主动离开组之前,有一个新配置控制轮询调用之间的最长时间(默认为5分钟).配置的值request.timeout.ms必须始终大于max.poll.interval.ms,因为这是在消费者重新平衡时JoinGroup请求可以在服务器上阻塞的最长时间,因此我们将其默认值更改为刚好超过5分钟.最后,session.timeout.ms的默认值已调整为10秒,默认值max.poll.records已更改为500.
解决方案2:如果您需要以较少的时间处理大量数据,则会增加更多分区并使用每个线程占用每个分区(这意味着更多并发),可以在5分钟内处理的数据更少
| 归档时间: |
|
| 查看次数: |
2729 次 |
| 最近记录: |