标签: kafka-producer-api

了解Kafka主题和分区

我开始学习Kafka用于企业解决方案.

在我的阅读中,我想到了一些问题:

  1. 当一个制作人正在制作一条消息时 - 它会指定它想要发送消息的主题,是吗?它关心分区吗?
  2. 当订阅者正在运行时 - 它是否指定了其组ID,以便它可以是同一主题的消费者群集的一部分,或者是该群体消费者感兴趣的几个主题?
  3. 每个消费者组在代理上是否有相应的分区,或者每个消费者都有一个分区?

  4. 作为经纪人创建的分区,因此不关心消费者?

  5. 由于这是一个每个分区都有一个偏移量的队列,因此消费者有责任指定它想要读取哪些消息吗?是否需要保存其状态?

  6. 从队列中删除邮件时会发生什么? - 例如:保留时间为3小时,然后时间过去了,两侧的偏移量如何处理?

apache-kafka kafka-consumer-api kafka-producer-api

136
推荐指数
3
解决办法
5万
查看次数

如何检查Kafka Server是否正在运行?

我想在开始生产和消费工作之前确保kafka服务器是否正在运行.它是在windows环境中,这是我的kafka服务器在eclipse中的代码...

Properties kafka = new Properties();
kafka.setProperty("broker.id", "1");
kafka.setProperty("port", "9092");
kafka.setProperty("log.dirs", "D://workspace//");
kafka.setProperty("zookeeper.connect", "localhost:2181");    
Option<String> option = Option.empty();
KafkaConfig config = new KafkaConfig(kafka);        
KafkaServer server = new KafkaServer(config, new CurrentTime(), option);
server.startup();
Run Code Online (Sandbox Code Playgroud)

在这种情况下if (server != null)是不够的,因为它总是如此.那么有没有办法知道我的kafka服务器正在运行并为生产者做好准备.我有必要检查一下,因为它会导致丢失一些起始数据包.

谢谢.

java apache-kafka kafka-producer-api

39
推荐指数
5
解决办法
7万
查看次数

如何将文件写入Kafka Producer

我想在Kafka中加载一个简单的文本文件而不是标准输入.下载Kafka后,我执行了以下步骤:

动物园管理员:

bin/zookeeper-server-start.sh config/zookeeper.properties

启动服务器

bin/kafka-server-start.sh config/server.properties

创建了一个名为"test"的主题:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

跑到制片人:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
Test1
Test2
Run Code Online (Sandbox Code Playgroud)

听取消费者的意见:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Test1
Test2
Run Code Online (Sandbox Code Playgroud)

我希望将数据文件甚至简单的文本文件传递给Producer,而不是标准输入,消费者可以直接看到它.真的很感激任何帮助.谢谢!

file apache-kafka kafka-consumer-api kafka-producer-api

29
推荐指数
3
解决办法
5万
查看次数

如何在apache kafka中创建主题?

在卡夫卡创建主题的最佳途径是什么?

  • 创建主题时要定义多少副本/分区?

在新的生产者API中,当我尝试将消息发布到不存在的主题时,它首次失败然后成功发布.

  • 我想知道,副本,分区和集群节点数之间的关系.
  • 我们是否需要在发布消息之前创建主题?

apache-kafka kafka-consumer-api kafka-producer-api

20
推荐指数
3
解决办法
4万
查看次数

主题,分区和密钥

我正在寻找关于这个问题的一些澄清.在Kafka文档中,我发现了以下内容:

Kafka仅对分区内的消息提供总订单,而不是在主题中的不同分区之间.对于大多数应用程序而言,按分区排序与按键分区数据的能力相结合就足够了.但是,如果您需要对邮件进行总订单,则可以使用仅包含一个分区的主题来实现,但这意味着每个使用者组只有一个使用者进程.

所以这是我的问题:

  1. 这是否意味着如果我想拥有超过1个消费者(来自同一组)从一个主题中读取我需要超过1个分区?

  2. 这是否意味着我需要相同数量的分区作为同一组的消费者数量?

  3. 有多少消费者可以从一个分区读取?

关于API的关键和分区之间的关系也有一些问题.我只看了.net API(特别是来自MS的API),但看起来像模仿Java API.我看到当使用生产者向主题发送消息时,有一个关键参数.但是,当消费者从主题中读取时,存在分区号.

  1. 分区是如何编号的?从0或1开始?
  2. 密钥和分区之间究竟有什么关系?据我所知,键上的某些功能将决定一个分区.那是对的吗?
  3. 如果我在一个主题中有2个分区,并希望某些特定消息转到一个分区而其他消息转到另一个分区,那么我应该为一个特定分区使用特定密钥,其余部分用于另一个分区?
  4. 如果我有3个分区和一种类型的消息到一个特定的分区,其余的到另外2个怎么办?
  5. 一般来说,我如何向特定分区发送消息,以便了解消费者从何处阅读?或者我最好有多个主题?

提前致谢.

apache-kafka kafka-consumer-api kafka-producer-api

20
推荐指数
2
解决办法
2万
查看次数

Kafka是否支持请求响应消息传递

我正在调查Kafka 9作为业余爱好项目,并完成了一些"Hello World"类型示例.

我必须考虑基于请求响应消息传递的Real World Kafka应用程序,更具体地说,如何将Kafka请求消息链接到其响应消息.

我正在考虑使用生成的UUID作为请求消息密钥,并将此请求UUID用作关联的响应消息密钥.与WebSphere MQ具有消息关联ID的机制大致相同.

我的结束2结束过程将是.

1).Kafka客户端生成随机UUID并发送单个Kafka请求消息.2).服务器将使用此请求消息提取并存储请求UUID值3).使用消息有效内容完成业务流程.4).响应响应消息,该消息使用来自请求消息的存储的UUID值作为响应消息Key.5).Kafka客户端轮询响应主题,直到它超时或检索具有原始请求UUID值的消息.

我关注的是Kafka Consumer轮询将从响应主题中删除其他客户端消息,并增加偏移量,使其他客户端失败.

我是否尝试在一个用例中应用Kafka它从未设计过?

是否可以在Kafka中实现请求/响应消息传递?

mom apache-kafka kafka-consumer-api kafka-producer-api

19
推荐指数
1
解决办法
1万
查看次数

KafkaProducer:`callback`和返回'Future`之间的区别?

KafkaProducer发送方法都返回一个Future并接受回调.

使用一种机制而不是另一种机制在完成发送后执行操作之间是否有任何根本区别?

java apache-kafka kafka-producer-api

19
推荐指数
2
解决办法
8040
查看次数

如何使用 kafka 控制台生产者发送键值消息

我有一个用例,我需要使用 Kafka Console Producer 发送键值消息。那么如何通过Kafka Console Producer命令来实现呢?

apache-kafka kafka-consumer-api kafka-producer-api

19
推荐指数
2
解决办法
2万
查看次数

如何为Kafka生产者配置日志记录?

我正在使用Kafka生产者客户端,我的项目中没有任何log4j配置.

在运行时,程序会打印很多我真正不想要的Kafka Debug日志.

所以,我尝试添加一个log4j.properties来将日志级别设置为ERROR,如下所示,这似乎不起作用:

log4j.rootLogger=ERROR
Run Code Online (Sandbox Code Playgroud)

我如何更改Kafka日志级别?

logging log4j apache-kafka kafka-producer-api

17
推荐指数
2
解决办法
3万
查看次数

kafka 身份验证失败,原因是:SSL 握手失败

我必须在 kafka 中使用 SSL 添加加密和身份验证。

这就是我所做的:

  1. 为每个broker kafka生成证书:

    keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey

  2. 创建 CA。生成的 CA 是一个公私钥对和用于签署其他证书的证书。CA 负责签署证书。

    openssl req -new -x509 -keyout ca-key -out ca-cert -days 365

  3. 使用生成的 CA 签署所有代理证书 从密钥库导出证书:

    keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file

    用 CA 签名:

    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}

  4. 将 CA 的证书和签名的证书都导入密钥库:

    keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert

    keytool -keystore server.keystore.jks -alias localhost …

apache-kafka kafka-consumer-api kafka-producer-api

17
推荐指数
1
解决办法
2万
查看次数