具有高级消费者 API 的 Kafka 偏移提交请求

vin*_*yag 5 java apache-kafka

我想使用 Kafka 高级消费者 API,同时我想禁用偏移量的自动提交。我试图通过以下步骤来实现这一点。

1) auto.commit.enable = false
2) offsets.storage = kafka
3) dual.commit.enabled = false
Run Code Online (Sandbox Code Playgroud)

我创建了一个偏移管理器,它会定期向 kafka 创建偏移提交请求并提交偏移。

我还有以下问题

1) 高级消费者 API 是否会自动从 kafka 存储中获取偏移量并使用该偏移量初始化自身?或者我应该使用简单的消费者 API 来实现这一点?

2)是否在所有经纪人中都复制了基于 kafka 的抵消存储?或者它只在一个经纪人上维护?

小智 0

我创建了一个偏移管理器,它定期向 kafka 创建 offsetcommit 请求并提交偏移。

如果您使用的是高级消费者,您不需要这样做,它为您提供了手动提交偏移量的方法,javadoc 在手动偏移控制下)为您提供了如何执行此操作的示例。

1)高级消费者API是否自动从kafka存储中获取偏移量并用该偏移量初始化自身?或者我应该使用简单的消费者 API 来实现这一点?

当您重新启动高级消费者时,高级消费者将负责获取最后提交的偏移量,因此您可以从上次中断的位置继续消费。

2)基于 kafka 的偏移量存储是否在所有代理之间复制?或者它仅由一个经纪商维护?

Kafka 将消费者偏移量存储在名为 的内部主题中__consumer_offsets,默认情况下其复制因子设置为 3,有 50 个分区。因此它被复制到 3 个经纪商中。您可以在代理配置offset中找到有关其配置的更多信息,它们以或开头offsets