标签: kafka-producer-api

如何向受SSL保护的kafka主题发送消息

任何人都可以提出建议,使用java KafkaProducer,我们需要设置哪些属性来将消息发送到受SSL保护的kafka主题,这是kafka的新增功能,无法向受SSL保护的kafka发送一条消息

java ssl apache-kafka kafka-producer-api

0
推荐指数
1
解决办法
8599
查看次数

KafkaProducer 循环分配不适用于同一密钥

我试图了解卡夫卡是如何工作的。我读到,默认情况下,Kafka 会在分区之间以循环方式分发来自生产者的消息。

但是,如果消息具有相同的密钥,为什么这些消息总是放在同一个分区中?(未配置分区键策略)。

例如,使用下面的代码,消息始终放置在同一分区中:

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
String key = properties.getProperty("dev.id");
producer.send(new ProducerRecord<String, String>(properties.getProperty("kafka.topic"), key, value), new EventGeneratorCallback(key));
Run Code Online (Sandbox Code Playgroud)

使用不同的密钥,消息以循环方式分发:

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
String key = properties.getProperty("dev.id") + UUID.randomUUID().toString();
producer.send(new ProducerRecord<String, String>(properties.getProperty("kafka.topic"), key, value), new EventGeneratorCallback(key));
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-producer-api

0
推荐指数
1
解决办法
4874
查看次数

我们如何在kafka中快速写入单个消息(而不是批量)?

我是 Golang 和 Kafka 的新手,我正在使用 Segmentio kafka-go 使用 Golang 连接到 Kafka 服务器。截至目前,我想推送 Kafka 中用户的每个事件,所以我想推送单个消息(而不是批量),但由于该库提供的写入操作对于批量或单个消息需要相同的时间,因此需要很多时间。有没有什么方法可以快速编写单条消息,以便我可以在更短的时间内推送卡夫卡中的数百万个事件?

我已经对单条消息和批量消息进行了测试,它花费相同的时间(最短为 10 毫秒)。

go apache-kafka segment-io kafka-producer-api

0
推荐指数
1
解决办法
3129
查看次数

kafka 主题中理想的分区数是多少?

我正在学习 Kafka 并尝试为我最近的搜索应用程序创建一个主题。推送到 kafka 主题的数据被认为是一个很大的数字。

我的 kafka 集群有 3 个代理,并且已经为其他需求创建了主题。

现在我应该为最近的搜索主题选择多少个分区?如果我没有明确提供分区号怎么办?选择分区号需要考虑哪些事项?

apache-kafka kafka-consumer-api kafka-producer-api kafka-topic

0
推荐指数
1
解决办法
2340
查看次数

在控制台模式下运行时,Kafka 的并行概念是否适用?

所以,我是 Kafka 的新手,我已经阅读了一段时间。我在 confluent 上找到了这些信息。

https://docs.confluent.io/current/streams/architecture.html

所以我从这里了解到,假设我有一个名为 plain_text 的主题,我只是将一堆记录作为纯文本发送,而我只有一个具有单个主题和单个分区的代理。我现在启动 2 个消费者实例 ConsumerA 和 ConsumerB。由于我的分区计数小于消费者计数,因此只有一个消费者应该主动消费消息,而另一个消费者则处于空闲状态。如果我错了,请纠正我。

我使用 kafka-console-* 脚本进行了测试

启动一个zookeeper集群

bin/zookeeper-server-start.sh config/zookeeper.properties
Run Code Online (Sandbox Code Playgroud)

在 localhost:9092 上启动一个 kafka 代理

bin/kafka-server-start.sh config/server.properties
Run Code Online (Sandbox Code Playgroud)

创建一个带有一个分区的主题纯文本

bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic plain_text
Run Code Online (Sandbox Code Playgroud)

开始制作人

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic plain_text
Run Code Online (Sandbox Code Playgroud)

启动属于同一组的 2 个消费者(运行相同的命令两次)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic plain_text \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property group.id=test_group \

Run Code Online (Sandbox Code Playgroud)

因此,两个消费者中的一个应该拥有该单个分区(如果我错了,请再次纠正我),但是我在生产者控制台上生成的任何内容在两个消费者控制台上都是可见的。为什么两个消费者都在使用来自单个分区的消息。我是否遗漏了什么或对 kafka-console-* 脚本执行了不同的规则。

apache-kafka kafka-consumer-api kafka-producer-api kafka-partition

0
推荐指数
1
解决办法
63
查看次数

消息在 Confluence Kafka Dotnet 中丢失

我在最近的 C# 项目中使用 Confluence kafka 包。我通过以下方式创建了一个生产者:

prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"};

foreach(msg in msglist){
    using(var producer = new ProducerBuilder<Null, string>(prodConfig).Build()){
        producer.ProduceAsync(topic, new Message<Null, string> {Value = msg});
    }
}
Run Code Online (Sandbox Code Playgroud)

但问题是我的一些信息没有传达给消费者。他们在某个地方迷路了。但是,如果我对生产者使用await ,那么所有消息都会被传递。如何无需等待即可传递我的所有消息。(我有一个分区)

.net c# apache-kafka kafka-producer-api confluent-kafka-dotnet

0
推荐指数
1
解决办法
1787
查看次数