在KAFKA中消费后删除消息

Sha*_*Ali 33 apache-kafka kafka-consumer-api

我正在使用apache kafka来生成和使用5GB大小的文件.我想知道是否有一种方法可以在消费后自动删除主题中的消息.我有办法跟踪消费消息吗?我不想手动删除它.

Lun*_*ahl 39

在Kafka,消费者的责任是消费者的责任,这也是Kafka具有如此出色的横向可扩展性的主要原因之一.

使用高级消费者API将通过在Zookeeper中提交消耗的偏移量来自动执行此操作(或者通过特殊的Kafka主题使用更新的配置选项来跟踪消耗的消息).

简单的消费者API使您可以自己处理消费消息的跟踪方式和位置.

在Kafka中清除消息是通过指定主题的保留时间或为其定义磁盘配额自动完成的,因此对于一个5GB文件的情况,此文件将在您定义的保留期过后删除,无论如果它已被消费.

  • 对于您想要使用消息、修改它并将其推送回不同主题的情况,在消息被使用后立即删除它们是有意义的。否则,您最终会在保留期内获得所有内容的 2 份副本。如果您可以在第一个主题上设置保留期但删除已使用的消息,则这是理想的选择。 (3认同)
  • 您确定在报复政策到期后,即使消息没有被消费,数据也会从主题中删除吗?这意味着在使用给定分区中的数据时,消费者将看到“漏洞”或丢失的消息。这不是违背了kafka关于可靠消息传递媒介的承诺吗? (2认同)

Den*_*din 10

您无法在消费时删除 Kafka 消息。Kafka没有消息被消费时直接删除的机制。

我在尝试执行此操作时发现的最接近的东西是这个技巧,但它未经测试,并且根据设计,它不适用于最新的消息:

实现此目的的一个潜在技巧是使用 (a) 压缩主题和 (b) 自定义分区器 (c) 一对拦截器的组合。

该过程如下:

  1. 使用生产者拦截器在写入之前将 GUID 添加到密钥的末尾。
  2. 使用自定义分区程序忽略 GUID 进行分区
  3. 使用压缩主题,这样您就可以通过 Producer.send(key+GUID, null) 删除您需要的任何单个消息
  4. 使用消费者拦截器在读取时删除 GUID。

但您不应该需要此功能:

有 1 个或多个消费者,并且希望他们总共只消费一次消息?
将它们放在同一个消费组中。

想要避免太多消息填满磁盘吗?
根据磁盘空间和/或时间设置保留。


sye*_*eer 8

根据我的知识,您可以通过减少存储时间从日志中删除消耗的数据.日志的默认时间设置为168小时,然后数据将自动从您创建的Kafka-Topic中删除.因此,我的建议是减少转到server.properties位于配置文件夹中的位置,并将168更改为最短时间.因此,在为log.retention.hours设置的特定时间后,它们没有数据.因此,您的问题将得到解决.

log.retention.hours = 168

继续编码

  • 这不是OP问题的解决方案.它将删除任何消息,无论它们是否已被消费. (35认同)