如何在不删除主题的情况下删除/清除Kafka排队的邮件

Tac*_*oma 16 apache-kafka

有没有办法删除队列消息而不删除Kafka主题?
我想在激活使用者时删除队列消息.

我知道有几种方式:

  1. 重置保留时间

    $ ./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --config retention.ms=1000

  2. 删除kafka文件

    $ rm -rf /data/kafka-logs/<topic/Partition_name>

Han*_*sen 20

在0.11或更高版本中,您可以运行bin/kafka-delete-records.sh命令来标记要删除的邮件.

https://github.com/apache/kafka/blob/trunk/bin/kafka-delete-records.sh

例如,发布100条消息

seq 100 | ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest
Run Code Online (Sandbox Code Playgroud)

然后使用新的kafka-delete-records.sh命令行工具删除这100条消息中的90条

./bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ./offsetfile.json
Run Code Online (Sandbox Code Playgroud)

其中包含offsetfile.json

 {"partitions": [{"topic": "mytest", "partition": 0, "offset": 90}], "version":1 }
Run Code Online (Sandbox Code Playgroud)

然后从头开始使用消息来验证100条消息中的90条确实标记为已删除.

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytest --from-beginning
91
92
93
94
95
96
97
98
99
100
Run Code Online (Sandbox Code Playgroud)

  • 不,这在卡夫卡中是不可能的。Kafka 有一个核心属性,那就是它是一个不可变的提交日志,允许任意范围删除将使其不那么不可变。 (2认同)

Ali*_*abi 8

要删除特定主题中的所有消息,可以运行 kafka-delete-records.sh

例如,我有一个名为的主题test,其中包含4 partitions

创建一个Json文件,例如j.json

{

"partitions": [

    {

        "topic": "test",

        "partition": 0,

        "offset": -1

    }, {

        "topic": "test",

        "partition": 1,

        "offset": -1

    }, {

        "topic": "test",

        "partition": 2,

        "offset": -1

    }, {

        "topic": "test",

        "partition": 3,

        "offset": -1

    }

],

"version": 1

}
Run Code Online (Sandbox Code Playgroud)

现在通过此命令删除所有消息:

/opt/kafka/confluent-4.1.1/bin/kafdelete-records --bootstrap-server 192.168.XX.XX:9092 --offset-json-file j.json
Run Code Online (Sandbox Code Playgroud)

执行命令后,将显示此消息

Records delete operation completed:
partition: test-0   low_watermark: 7
partition: test-1   low_watermark: 7
partition: test-2   low_watermark: 7
partition: test-3   low_watermark: 7
Run Code Online (Sandbox Code Playgroud)