小编Hin*_*ing的帖子

春天卡夫卡总是在5分钟后重新平衡,即使我暂停消费者

有一个耗时的操作(大约10分钟),但卡夫卡在5分钟后重新平衡,甚至我暂停了消费者.消费者方法:

    @KafkaListener(topics = {TopicAppoint.EXECUTE_SCHOOL_DATA_STATICS_TASK})
public void receiveMessage(@Payload String payload, Consumer<String, String> consumer) {

    Set<TopicPartition> assignment = consumer.assignment();
    consumer.pause(assignment);

    if (StringUtils.isNotEmpty(payload)) {
        SchoolStatisticsTaskDTO staticsTaskDTO = JSONObject.parseObject(payload, SchoolStatisticsTaskDTO.class);
        Optional<SchoolStatisticsTaskDO> taskOptional = schoolStatisticsTaskRepository.findById(staticsTaskDTO.getTrackId());
        taskOptional.ifPresent(schoolStaticsTaskDO -> {
            // handler
        });
    }

    consumer.resume(assignment);
}
Run Code Online (Sandbox Code Playgroud)

这是我的配置:

  kafka:
bootstrap-servers: 192.168.0.230:9092
producer:
  key-serializer: org.apache.kafka.common.serialization.StringSerializer
  value-serializer: org.apache.kafka.common.serialization.StringSerializer
  retries: 3
  properties:
    max.request.size: 12582912
consumer:
  key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  group-id: dc-fitness-data-consumer-group
  properties:
    max.partition.fetch.bytes: 12582912
  #enable-auto-commit: false
listener:
  ack-mode: record
  concurrency: 6
Run Code Online (Sandbox Code Playgroud)

日志

  13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-25, groupId=dc-fitness-data-consumer-group] Attempt …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-boot spring-kafka

4
推荐指数
1
解决办法
2729
查看次数

标签 统计

apache-kafka ×1

spring-boot ×1

spring-kafka ×1