清除卡夫卡主题

Pet*_*fel 162 purge apache-kafka

我在本地计算机上将一条太大的消息推入了一个kafka消息主题,现在我收到一个错误:

kafka.common.InvalidMessageSizeException: invalid message size
Run Code Online (Sandbox Code Playgroud)

增加fetch.size这里并不理想,因为我实际上并不想接受那么大的消息.有没有办法在卡夫卡中清除主题?

小智 349

暂时将主题的保留时间更新为一秒:

kafka-topics.sh --zookeeper <zkhost>:2181 --alter --topic <topic name> --config retention.ms=1000
Run Code Online (Sandbox Code Playgroud)

在较新的Kafka版本中,您也可以使用它 kafka-configs --entity-type topics

kafka-configs.sh --zookeeper <zkhost>:2181 --entity-type topics --alter --entity-name <topic name> --add-config retention.ms=1000
Run Code Online (Sandbox Code Playgroud)

然后等待清除生效(大约一分钟).清除后,恢复以前的retention.ms值.

  • 从0.9.0开始,使用kafka-topics.sh更改配置已被弃用.新选项是使用kafka-configs.sh脚本.`例如kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms = 1000`这也允许你检查当前保留期,例如kafka-configs --zookeeper <zkhost>:2181 --describe --entity-type topics --entity-name <topic name> (52认同)
  • 我不知道有关检查当前的配置,但我相信重置回默认的样子:`斌/ kafka-topics.sh --zookeeper本地主机:2181 --alter --topic MyTopic --deleteConfig retention.ms` (26认同)
  • 或者取决于版本:`--delete-config retention.ms` (15认同)
  • 在 2.8.0 版本中 `--zookeeper` 也被弃用。最好改用引导服务器。`kafka-configs.sh --bootstrap-server &lt;bstserver&gt;:9091 --entity-type topic --alter --entity-name &lt;主题名称&gt; --add-config replacement.ms=1000` (8认同)
  • 这是一个很好的答案,但是请您添加说明如何开始检查主题的当前retention.ms值? (6认同)
  • .只是一个供参考,对于卡夫卡0.9.0.0 V,它说:Ubuntu的@ IP-172-31-21-201:/opt/kafka/kafka_2.10-0.9.0.0-SNAPSHOT$斌/ kafka-topics.sh - -zookeeper localhost:2181 --alter --topic room-data --config retention.ms = 1000警告:不推荐使用此脚本更改主题配置,并且可能会在将来的版本中将其删除.接下来,请使用kafka-configs.sh来实现此功能 (3认同)
  • @AndrewNorman是正确的。此答案不会删除整个主题。它只会删除一些记录,甚至不保证删除超过 1 秒的记录。为什么?Kafka记录存储在段中,活动日志段永远不会被删除,即使它有超过retention.ms的记录。 (3认同)

小智 58

要清除队列,您可以删除主题:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
Run Code Online (Sandbox Code Playgroud)

然后重新创建它:

bin/kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 --partitions 1 --topic test
Run Code Online (Sandbox Code Playgroud)

  • 记得在`config/server.properties`文件中添加一行`delete.topic.enable = true`,因为上述命令打印的警告说"注意:如果delete.topic.enable未设置为true.` (13认同)
  • 这并不总是即时的。有时它只会标记为删除,实际删除将在以后发生。 (4认同)

Tho*_*att 47

以下是我删除名为的主题的步骤MyTopic:

  1. 描述主题,而不是代理商ID
  2. 停止列出的每个代理ID的Apache Kafka守护程序.
  3. 连接到每个代理,并删除主题数据文件夹,例如rm -rf /tmp/kafka-logs/MyTopic-0.重复其他分区和所有副本
  4. 删除主题元数据:zkCli.sh然后rmr /brokers/MyTopic
  5. 为每台已停止的计算机启动Apache Kafka守护程序

如果您错过了第3步,那么Apache Kafka将继续报告该主题(例如,如果您运行的话kafka-list-topic.sh).

使用Apache Kafka 0.8.0进行测试.

  • 在0.8.1`./ zookeeper-shell.sh localhost:2181`和`./kafka-topics.sh --list --zookeeper localhost:2181`中 (2认同)
  • 这将删除主题,而不是其中的数据。这需要停止 Broker。这充其量只是一个黑客。Steven Appleyard 的回答确实是最好的。 (2认同)
  • 在Kafka 0.8.2.1为我工作,虽然在zookeeper中的topis在/ brokers/topics/<主题名称> (2认同)

Sha*_*rry 40

虽然接受的答案是正确的,但该方法已被弃用.现在应该通过主题配置完成kafka-configs.

kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic
Run Code Online (Sandbox Code Playgroud)

可以使用该命令显示通过此方法设置的配置

kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
Run Code Online (Sandbox Code Playgroud)

  • 还值得添加:`kafka-configs --zookeeper localhost:2181-实体类型主题--alter-删除配置保留.ms-实体名称MyTopic` (2认同)

Pat*_*ick 36

在Kafka 0.8.2中测试,用于快速入门示例:首先,在config文件夹下的server.properties文件中添加一行:

delete.topic.enable=true
Run Code Online (Sandbox Code Playgroud)

然后,您可以运行此命令:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
Run Code Online (Sandbox Code Playgroud)


kha*_*ing 10

以下命令可用于删除 kafka 主题中的所有现有消息:

kafka-delete-records --bootstrap-server <kafka_server:port> --offset-json-file delete.json
Run Code Online (Sandbox Code Playgroud)

delete.json 文件的结构应如下所示:

{ "partitions": [ { "topic": "foo", "partition": 1, "offset": -1 } ], "version": 1 }

其中 offset :-1 将删除所有记录(此命令已用 kafka 2.0.1 测试


Man*_*wal 6

kafka 没有用于清除/清理主题(队列)的直接方法,但可以通过删除该主题并重新创建它来实现。

首先确保 sever.properties 文件有,如果没有添加 delete.topic.enable=true

然后,删除主题 bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic

然后再次创建它。

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
Run Code Online (Sandbox Code Playgroud)


小智 6

从kafka 1.1开始

清除主题

bin / kafka-configs.sh --zookeeper本地主机:2181-更改-实体类型主题->-实体名称tp_binance_kline --add-configtention.ms = 100

请等待1分钟,以确保kafka清除主题以删除配置,然后再使用默认值

bin / kafka-configs.sh --zookeeper本地主机:2181-更改-实体类型主题->-实体名称tp_binance_kline-删除配置保留.ms


abb*_*bas 6

在@steven appleyard 回答之后,我在 Kafka 2.2.0 上执行了以下命令,它们为我工作。

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --describe

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --add-config retention.ms=1000

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --delete-config retention.ms
Run Code Online (Sandbox Code Playgroud)

  • 这似乎与其他答案重复 (2认同)

Wil*_*ire 5

更新:此答案与 Kafka 0.6 相关。对于 Kafka 0.8 及更高版本,请参阅@Patrick 的回答。

是的,停止kafka,手动删除对应子目录下的所有文件(在kafka数据目录下很容易找到)。kafka 重启后主题将为空。


Ben*_*lan 5

有时,如果您的集群已饱和(分区过多,或使用加密的主题数据,或使用 SSL,或控制器位于坏节点上,或连接不稳定,则清除所述主题将需要很长时间.

我遵循这些步骤,尤其是当您使用 Avro 时。

1:使用kafka工具运行:

bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>
Run Code Online (Sandbox Code Playgroud)

2:在Schema注册节点上运行:

kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning

3:一旦主题为空,将主题保留设置回原始设置。

bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>
Run Code Online (Sandbox Code Playgroud)

希望这对某人有所帮助,因为它不容易宣传。


Vla*_*kin 5

这里有很多很棒的答案,但在其中,我没有找到关于 docker 的答案。我花了一些时间才发现在这种情况下使用代理容器是错误的(显然!!!)

## this is wrong!
docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
Run Code Online (Sandbox Code Playgroud)
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
        at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
        at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
        at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
        at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
        at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
Run Code Online (Sandbox Code Playgroud)

我应该使用zookeeper:2181而不是--zookeeper localhost:2181按照我的撰写文件

## this might be an option, but as per comment below not all zookeeper images can have this script included
docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
Run Code Online (Sandbox Code Playgroud)

正确的命令是

docker exec broker1 kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name dev_gdn_urls --add-config retention.ms=12800000
Run Code Online (Sandbox Code Playgroud)

希望它会节省某人的时间。

另外,请注意消息不会立即被删除,而是会在日志段关闭时发生。