在日志末尾时,如何停止尝试从 Kafka 消费消息?

Mat*_*ley 5 apache-kafka kafka-consumer-api

我有一个按计划创建的 Kafka 消费者。它尝试使用自上次提交以来添加的所有新消息。

我想在消费完日志中的所有新消息后关闭消费者,而不是无限期地等待新消息进入。

我无法通过 Kafka 的文档找到解决方案。

我在 Confluence.Kafka.ConsumerConfig 和 ClientConfig 类中看到许多与超时相关的属性,包括 FetchWaitMaxMs,但无法破译要使用哪些属性。我正在使用 .NET 客户端。

任何意见,将不胜感激。

Mat*_*ley 5

我找到了解决办法。Confluence 的 .NET Kafka 库 1.0.0-beta2 版本提供了一种名为.Consume(TimeSpan timeSpan). 如果没有新消息可供使用或者我们位于分区 EOF,则这将返回 null。我之前使用的.Consume(CancellationToken cancellationToken)过载会阻塞并阻止我关闭消费者。更多信息: https: //github.com/confluenceinc/confluence-kafka-dotnet/issues/614#issuecomment-433848857

另一个选择是升级到版本 1.0.0-beta3,它在 ConsumeResult 对象上提供一个名为 IsPartitionEOF 的布尔标志。这就是我最初想要的 - 一种知道我何时到达分区末尾的方法。


Mik*_*kis 2

我从未使用过 .NET 客户端,但假设它与 Java 客户端没有太大不同,该poll()方法应该接受以毫秒为单位的超时值,因此将其设置为5000在大多数情况下应该有效。无需摆弄配置类。

另一种方法是在创建消费者时找到最大偏移量,并且只读取该偏移量为止。理论上,如果消费者的消费速度不如生产者的生产速度,那么理论上这将阻止消费者无限期地运行。但我从未尝试过这种方法。