Cur*_*ind 2 apache-kafka kafka-consumer-api apache-kafka-streams
我试图在一些关于 kafka 流(kafka 流客户端到 kafka)的细节方面了解 kafka。
我知道 KafkConsumer(java 客户端)将从 kafka 获取数据,但是我无法理解客户端轮询 kakfa 主题以获取数据的频率?
轮询的频率由您的代码定义,因为您负责调用 poll。一个非常简单的使用 KafkaConsumer 的用户代码示例如下
public class KafkaConsumerExample {
...
static void runConsumer() throws InterruptedException {
final Consumer<Long, String> consumer = createConsumer();
final int giveUp = 100; int noRecordsCount = 0;
while (true) {
final ConsumerRecords<Long, String> consumerRecords =
consumer.poll(1000);
if (consumerRecords.count()==0) {
noRecordsCount++;
if (noRecordsCount > giveUp) break;
else continue;
}
consumerRecords.forEach(record -> {
System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
record.key(), record.value(),
record.partition(), record.offset());
});
consumer.commitAsync();
}
consumer.close();
System.out.println("DONE");
}
}
Run Code Online (Sandbox Code Playgroud)
在这种情况下,频率由处理消息的持续时间定义consumerRecords.forEach。
但是,请记住,如果您没有“足够快”地调用 poll,您的消费者将被代理协调器视为已死亡,并且将触发重新平衡。这种“足够快”是由max.poll.interval.mskafka >= 0.10.1.0 中的属性决定的。有关更多详细信息,请参阅此答案。
max.poll.interval.ms默认值为五分钟,因此如果您consumerRecords.forEach花费的时间超过此时间,您的消费者将被视为死亡。
如果您不想KafkaConsumer直接使用原始数据,您可以使用alpakka kafka,这是一个以安全和背压方式(基于 akka 流)从消费和生产到 kafka 主题的库。
有了这个库,轮询的频率由配置决定akka.kafka.consumer.poll-interval。
我们说是安全的,因为它会继续轮询以避免消费者被视为死亡,即使您的处理无法跟上速度。它能够这样做是因为KafkaConsumer允许暂停消费者
/**
* Suspend fetching from the requested partitions. Future calls to {@link #poll(Duration)} will not return
* any records from these partitions until they have been resumed using {@link #resume(Collection)}.
* Note that this method does not affect partition subscription. In particular, it does not cause a group
* rebalance when automatic assignment is used.
* @param partitions The partitions which should be paused
* @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
*/
@Override
public void pause(Collection<TopicPartition> partitions) { ... }
Run Code Online (Sandbox Code Playgroud)
要完全理解这一点,您应该阅读 akka-streams 和背压。
| 归档时间: |
|
| 查看次数: |
5383 次 |
| 最近记录: |