哪个 kafka 属性决定了 KafkaConsumer 的轮询频率?

Cur*_*ind 2 apache-kafka kafka-consumer-api apache-kafka-streams

我试图在一些关于 kafka 流(kafka 流客户端到 kafka)的细节方面了解 kafka。

我知道 KafkConsumer(java 客户端)将从 kafka 获取数据,但是我无法理解客户端轮询 kakfa 主题以获取数据的频率?

gab*_*ssi 5

轮询的频率由您的代码定义,因为您负责调用 poll。一个非常简单的使用 KafkaConsumer 的用户代码示例如下

public class KafkaConsumerExample {
  ...


    static void runConsumer() throws InterruptedException {
        final Consumer<Long, String> consumer = createConsumer();

        final int giveUp = 100;   int noRecordsCount = 0;

        while (true) {
            final ConsumerRecords<Long, String> consumerRecords =
                    consumer.poll(1000);

            if (consumerRecords.count()==0) {
                noRecordsCount++;
                if (noRecordsCount > giveUp) break;
                else continue;
            }

            consumerRecords.forEach(record -> {
                System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
                        record.key(), record.value(),
                        record.partition(), record.offset());
            });

            consumer.commitAsync();
        }
        consumer.close();
        System.out.println("DONE");
    }
}
Run Code Online (Sandbox Code Playgroud)

在这种情况下,频率由处理消息的持续时间定义consumerRecords.forEach

但是,请记住,如果您没有“足够快”地调用 poll,您的消费者将被代理协调器视为已死亡,并且将触发重新平衡。这种“足够快”是由max.poll.interval.mskafka >= 0.10.1.0 中的属性决定的。有关更多详细信息,请参阅此答案

max.poll.interval.ms默认值为五分钟,因此如果您consumerRecords.forEach花费的时间超过此时间,您的消费者将被视为死亡。

如果您不想KafkaConsumer直接使用原始数据,您可以使用alpakka kafka,这是一个以安全和背压方式(基于 akka 流)从消费和生产到 kafka 主题的库。
有了这个库,轮询的频率由配置决定akka.kafka.consumer.poll-interval
我们说是安全的,因为它会继续轮询以避免消费者被视为死亡,即使您的处理无法跟上速度。它能够这样做是因为KafkaConsumer允许暂停消费者

 /**
     * Suspend fetching from the requested partitions. Future calls to {@link #poll(Duration)} will not return
     * any records from these partitions until they have been resumed using {@link #resume(Collection)}.
     * Note that this method does not affect partition subscription. In particular, it does not cause a group
     * rebalance when automatic assignment is used.
     * @param partitions The partitions which should be paused
     * @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
     */
    @Override
    public void pause(Collection<TopicPartition> partitions) { ... }
Run Code Online (Sandbox Code Playgroud)

要完全理解这一点,您应该阅读 akka-streams 和背压。