生成Kafka消息到选定的分区

pWo*_*Woz 14 apache-kafka

根据Kafka文件:

生产者负责选择将哪个消息分配给主题中的哪个分区.

因此我的主要问题是:

如何使用kafka-console-producer.sh(或Kafka Java客户端)将消息发送到所选分区?

我想在消息发送时指定某种"分区ID".可能这样的"分区ID"存储在Zookeeper中的某个地方.你知道哪一个值(在Zookeeper中)识别Kafka分区吗?

Ced*_*ury 16

到目前为止,ConsoleProducer似乎支持将键控消息写入主题.Kafka将使用密钥的散列将消息分发到分区中,至少使用默认行为.

目前,默认分隔符为\t,因此输入key[\t]message将在分区之间分配:

key1    a-message
Run Code Online (Sandbox Code Playgroud)

可以通过提供key.separator配置来更改分隔符,例如:

kafka-console-producer --broker-list localhost:9092,localhost:9093 \
  --topic mytopic --property key.separator=,
Run Code Online (Sandbox Code Playgroud)

发送这样的消息:

key2,another-message
Run Code Online (Sandbox Code Playgroud)

我已成功使用默认选项卡和自定义分隔符对此进行了测试.消息被分发到两个单独的分区.

  • 它也对我有用。值也可以包含分隔符。它只是查找第一个分隔符位置并将记录拆分为两个标记 (2认同)
  • 考虑使用类似 [kafkacat](https://github.com/edenhill/kafkacat/blob/master/README.md) 的东西。 (2认同)
  • kafka-console-consumer 接受 print.key 属性。您还可以自定义字符串来分隔输出中的键和值:key.separator。--property print.key=true --property key.separator=" - " (2认同)

ssi*_*ice 12

根据当前的状态(Kafka> = 0.10.0.1),kafka-console-producer.sh脚本和底层的ConsoleProducer java类支持使用密钥发送数据,但默认情况下禁用此类支持并且必须启用来自CLI.

也就是说,您需要设置属性parse.key.此外,如果您想使用与制表符不同的内容,请key.separator按照Cedric的答案中的说明使用.

最后,命令行将是:

kafka-console.producer.sh --broker-list kafka:9092,kafka2:9092 \
    --topic $TOPIC --property parse.key=true --property key.separator=|
Run Code Online (Sandbox Code Playgroud)


Chi*_*ron 6

这是您的出发点:
partitioner.class在您的Properties实例中进行设置.在Kafka中,默认实现是kafka.producer.DefaultPartitioner.

该设置的目标是:

用于在子主题之间对消息进行分区的分区器类.默认分区程序基于密钥的哈希值.

这意味着如果要更改默认分区程序的行为,则需要创建自己的kafka.producer.Partitioner接口实现.

我建议在创建自己的策略时要非常小心,真的,测试它并监控你的主题和分区.


Den*_*nko 3

kafka-console-producer.sh不支持开箱即用地向特定分区生成消息。

然而,更新脚本以传递分区 Id 的额外参数,然后在自定义分区器中处理它应该非常简单,如 @Chiron 在 kafka.tools.ConsoleProducer 类的修改版本中的帖子中所述。

查看源代码:

https://apache.googlesource.com/kafka/+/refs/heads/trunk/bin/kafka-console- Producer.sh https://apache.googlesource.com/kafka/+/refs/heads/trunk/core /src/main/scala/kafka/tools/ConsoleProducer.scala