Iho*_* M. 10 java apache-kafka kafka-consumer-api
我有一个消费者工作者应用程序,内部正在启动X线程数,每个线程产生它的KafkaCosnumer.Cosnumers拥有groupId相同的主题并订阅相同的主题.因此,每个消费者都可以获得公平的分区份额.
处理的本质是我不能丢失消息,也不能允许重复.我正在运行的kafka版本是0.10.2.1.
这是我面临的问题:消费者线程1开始消费消息,然后poll()获取一批消息.我也实现了ConsumerRebalanceListener,所以每次成功处理消息时,它都会被添加到offsets地图中.(请参阅下面的代码.)因此,一旦重新平衡发生,我可以在将分区重新分配给其他使用者之前提交我的偏移量.有时,为了处理该批处理,需要更长的时间max.poll.interval.ms,这是重新平衡发生的地方,分区从消费者1中提取并分配给消费者2.消费者1不知道分区被撤销并继续处理消息,在同时,消费者2从最后一个偏移量(由RebalanceListener提交)中获取并处理相同的消息.
有没有办法通知消费者他已撤销分区,以便他可以停止处理已经分配给其他消费者的循环中的消息?
public class RebalanceListener<K, V> implements ConsumerRebalanceListener {
private final KafkaConsumer<K, V> consumer;
private static final ConcurrentMap<TopicPartition, OffsetAndMetadata> CURRENT_OFFSETS =
Maps.newConcurrentMap();
private static final Logger LOGGER = LoggerFactory.getLogger(RebalanceListener.class);
public RebalanceListener(KafkaConsumer<K, V> consumer) {
this.consumer = consumer;
}
public void addOffset(String topic, int partition, long offset) {
LOGGER.debug("message=Adding offset to offsets map, topic={}, partition={}, offset={}",
topic, partition, offset);
CURRENT_OFFSETS.put(new TopicPartition(topic, partition),
new OffsetAndMetadata(offset, "commit"));
}
public Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets() {
return CURRENT_OFFSETS;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
LOGGER.debug("message=following partitions have been revoked from consumer: [{}]",
partitions.stream().map(
topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
.collect(joining(",")));
LOGGER.debug("message=Comitting offsets for partititions [{}]",
CURRENT_OFFSETS.keySet().stream().map(
topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
.collect(joining(",")));
consumer.commitSync(CURRENT_OFFSETS);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
LOGGER.debug("message=following partitions have been assigned to consumer: [{}]",
partitions.stream().map(
topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
.collect(joining(",")));
}
}
Run Code Online (Sandbox Code Playgroud)
我想我可以在其中consumerId -- TopicPartition创建一个并发映射,RebalanceListener然后在处理每个消息之前检查当前消费者是否仍然与记录相关联(每个ConsumerRecord都有topic和partition字段).如果不是 - 打破循环并进行下一步poll().
如果我的工作者应用程序将在一个单独的实例中运行,即使有几个KafkaConsumer线程正在旋转,这将是一个可行的解决方案.但是一旦我扩展它,我将无法在静态地图中存储偏移量和consumer-topicPartition映射.那必须是某种集中存储,数据库,或者说,Redis.
但是,在每次处理项目之前,我都要问我的记录是否可以由当前的消费者线程合法处理.对于缩放的工作者应用程序,它将是对外部存储的网络调用,这将破坏使用kafka的目的,因为它将减慢处理速度.我可能会在处理单个项目后选择执行偏移提交.
您需要实现 onPartitionsRevoked()
保证所有消费者进程将在任何进程调用 onPartitionsAssigned 之前调用 onPartitionsRevoked。因此,如果偏移量或其他状态保存在 onPartitionsRevoked 调用中,则保证在接管该分区的进程调用其 onPartitionsAssigned 回调来加载状态时保存。