Nim*_*981 5 spring apache-kafka spring-kafka
我使用下面的示例来使用 spring Kafka 消费者读取消息。我的用例要求每次生成消息时,侦听器每次都从头开始读取。
@KafkaListener(
id = "grouplistener",
topicPartitions = {
@TopicPartition(
topic = "mycompactedtopic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0")
)
}
)
public void onReceiving(
String payload, @Header(KafkaHeaders.OFFSET) Integer offset,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic
) {
log.info(
"Processing topic = {}, partition = {}, offset = {}, payload= {}",
topic, partition, offset, payload
);
}
Run Code Online (Sandbox Code Playgroud)
我似乎只能让它在应用程序启动时从头开始读取,然后它通常只会消耗接下来的消息。
有没有办法强迫它每次都寻求开始?
使用具有 1 个分区的压缩主题来保存配置列表。然后需要由休息端点调用,并且它应该显示完整的唯一配置列表
您应该实现此目的的方法是使用 Kafka Streams 和 KTable 并在 REST 层后面设置交互式查询。不是需要倒回自身以获得系统最新状态的标准消费者。
Kafka Connect 框架中已经存在这样的示例,其中它有一个配置主题,您只能访问 的最新值GET /connectors/name/config,并且只有当您重新启动它或扩展到更多实例时,它才会再次消耗所有消息。模式注册表也是一个例子,它存储主题中所有模式的内部哈希图_schemas,并具有用于读取、插入、删除的 REST API
本质上,当您获得给定键的新配置时,您可以用全新的值“替换”给定键的旧值,也可以以某种方式将旧值与新数据“合并”。
| 归档时间: |
|
| 查看次数: |
13470 次 |
| 最近记录: |