卡夫卡消费者的多个话题

Apo*_*llo 18 java multithreading apache-kafka kafka-consumer-api

我有一个主题列表(现在是10个),其大小可以在未来增加.我知道我们可以生成多个线程(每个主题)来从每个主题中使用,但在我的情况下,如果主题数量增加,那么从主题中消耗的线程数量会增加,这是我不想要的,因为主题不是过于频繁地获取数据,因此线程将是理想的.

有没有办法让一个消费者从所有主题中消费?如果是,那我们怎样才能实现呢?卡夫卡还将如何维持这种抵消?请提出答案.

Sub*_*aha 14

我们可以使用以下API订阅多个主题:consumer.subscribe(Arrays.asList(topic1,topic2),ConsumerRebalanceListener obj)

消费者有主题信息,我们可以通过创建OffsetAndMetadata对象使用consumer.commitAsync或consumer.commitSync()进行如下操作.

ConsumerRecords<String, String> records = consumer.poll(long value);
for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    for (ConsumerRecord<String, String> record : partitionRecords) {
        System.out.println(record.offset() + ": " + record.value());
    }
    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
Run Code Online (Sandbox Code Playgroud)

  • 偏移量由您的应用提交,并存储在名为__consumer_offsets的特殊偏移量kafka主题中。偏移量会保留在每个主题的每个分区中,因此您订阅多少主题都没有关系。 (2认同)
  • 这似乎要求所有主题使用相同的序列化器。有什么办法可以在单个消费者轮询中允许每个主题使用不同的序列化器? (2认同)

小智 6

不需要多线程,你可以有一个消费者,从多个主题中消费。偏移量由 zookeeper 维护,因为 kafka-server 本身是无状态的。每当消费者消费一条消息时,它的偏移量都会与zookeeper一起提交,以保持未来的轨迹,每条消息只处理一次。因此,即使在 kafka 失败的情况下,消费者也会从上次提交的下一个偏移量开始消费。

  • 从 Kafka 0.9 及更高版本开始,偏移量存储在 Kafka 主题中而不是 zookeeper (16认同)