Pet*_*fel 162 purge apache-kafka
我在本地计算机上将一条太大的消息推入了一个kafka消息主题,现在我收到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
Run Code Online (Sandbox Code Playgroud)
增加fetch.size
这里并不理想,因为我实际上并不想接受那么大的消息.有没有办法在卡夫卡中清除主题?
小智 349
暂时将主题的保留时间更新为一秒:
kafka-topics.sh --zookeeper <zkhost>:2181 --alter --topic <topic name> --config retention.ms=1000
Run Code Online (Sandbox Code Playgroud)
在较新的Kafka版本中,您也可以使用它 kafka-configs --entity-type topics
kafka-configs.sh --zookeeper <zkhost>:2181 --entity-type topics --alter --entity-name <topic name> --add-config retention.ms=1000
Run Code Online (Sandbox Code Playgroud)
然后等待清除生效(大约一分钟).清除后,恢复以前的retention.ms
值.
小智 58
要清除队列,您可以删除主题:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
Run Code Online (Sandbox Code Playgroud)
然后重新创建它:
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic test
Run Code Online (Sandbox Code Playgroud)
Tho*_*att 47
以下是我删除名为的主题的步骤MyTopic
:
rm -rf /tmp/kafka-logs/MyTopic-0
.重复其他分区和所有副本zkCli.sh
然后rmr /brokers/MyTopic
如果您错过了第3步,那么Apache Kafka将继续报告该主题(例如,如果您运行的话kafka-list-topic.sh
).
使用Apache Kafka 0.8.0进行测试.
Sha*_*rry 40
虽然接受的答案是正确的,但该方法已被弃用.现在应该通过主题配置完成kafka-configs
.
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic
Run Code Online (Sandbox Code Playgroud)
可以使用该命令显示通过此方法设置的配置
kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
Run Code Online (Sandbox Code Playgroud)
Pat*_*ick 36
在Kafka 0.8.2中测试,用于快速入门示例:首先,在config文件夹下的server.properties文件中添加一行:
delete.topic.enable=true
Run Code Online (Sandbox Code Playgroud)
然后,您可以运行此命令:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
Run Code Online (Sandbox Code Playgroud)
kha*_*ing 10
以下命令可用于删除 kafka 主题中的所有现有消息:
kafka-delete-records --bootstrap-server <kafka_server:port> --offset-json-file delete.json
Run Code Online (Sandbox Code Playgroud)
delete.json 文件的结构应如下所示:
{ "partitions": [ { "topic": "foo", "partition": 1, "offset": -1 } ], "version": 1 }
其中 offset :-1 将删除所有记录(此命令已用 kafka 2.0.1 测试
kafka 没有用于清除/清理主题(队列)的直接方法,但可以通过删除该主题并重新创建它来实现。
首先确保 sever.properties 文件有,如果没有添加 delete.topic.enable=true
然后,删除主题
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic
然后再次创建它。
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
Run Code Online (Sandbox Code Playgroud)
小智 6
从kafka 1.1开始
清除主题
bin / kafka-configs.sh --zookeeper本地主机:2181-更改-实体类型主题->-实体名称tp_binance_kline --add-configtention.ms = 100
请等待1分钟,以确保kafka清除主题以删除配置,然后再使用默认值
bin / kafka-configs.sh --zookeeper本地主机:2181-更改-实体类型主题->-实体名称tp_binance_kline-删除配置保留.ms
在@steven appleyard 回答之后,我在 Kafka 2.2.0 上执行了以下命令,它们为我工作。
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --describe
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --add-config retention.ms=1000
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --delete-config retention.ms
Run Code Online (Sandbox Code Playgroud)
更新:此答案与 Kafka 0.6 相关。对于 Kafka 0.8 及更高版本,请参阅@Patrick 的回答。
是的,停止kafka,手动删除对应子目录下的所有文件(在kafka数据目录下很容易找到)。kafka 重启后主题将为空。
有时,如果您的集群已饱和(分区过多,或使用加密的主题数据,或使用 SSL,或控制器位于坏节点上,或连接不稳定,则清除所述主题将需要很长时间.
我遵循这些步骤,尤其是当您使用 Avro 时。
1:使用kafka工具运行:
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>
Run Code Online (Sandbox Code Playgroud)
2:在Schema注册节点上运行:
kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning
3:一旦主题为空,将主题保留设置回原始设置。
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>
Run Code Online (Sandbox Code Playgroud)
希望这对某人有所帮助,因为它不容易宣传。
这里有很多很棒的答案,但在其中,我没有找到关于 docker 的答案。我花了一些时间才发现在这种情况下使用代理容器是错误的(显然!!!)
## this is wrong!
docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
Run Code Online (Sandbox Code Playgroud)
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Run Code Online (Sandbox Code Playgroud)
我应该使用zookeeper:2181
而不是--zookeeper localhost:2181
按照我的撰写文件
## this might be an option, but as per comment below not all zookeeper images can have this script included
docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
Run Code Online (Sandbox Code Playgroud)
正确的命令是
docker exec broker1 kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name dev_gdn_urls --add-config retention.ms=12800000
Run Code Online (Sandbox Code Playgroud)
希望它会节省某人的时间。
另外,请注意消息不会立即被删除,而是会在日志段关闭时发生。
归档时间: |
|
查看次数: |
159591 次 |
最近记录: |