我开始学习Kafka用于企业解决方案.
在我的阅读中,我想到了一些问题:
每个消费者组在代理上是否有相应的分区,或者每个消费者都有一个分区?
作为经纪人创建的分区,因此不关心消费者?
由于这是一个每个分区都有一个偏移量的队列,因此消费者有责任指定它想要读取哪些消息吗?是否需要保存其状态?
从队列中删除邮件时会发生什么? - 例如:保留时间为3小时,然后时间过去了,两侧的偏移量如何处理?
我想在开始生产和消费工作之前确保kafka服务器是否正在运行.它是在windows环境中,这是我的kafka服务器在eclipse中的代码...
Properties kafka = new Properties();
kafka.setProperty("broker.id", "1");
kafka.setProperty("port", "9092");
kafka.setProperty("log.dirs", "D://workspace//");
kafka.setProperty("zookeeper.connect", "localhost:2181");
Option<String> option = Option.empty();
KafkaConfig config = new KafkaConfig(kafka);
KafkaServer server = new KafkaServer(config, new CurrentTime(), option);
server.startup();
Run Code Online (Sandbox Code Playgroud)
在这种情况下if (server != null)是不够的,因为它总是如此.那么有没有办法知道我的kafka服务器正在运行并为生产者做好准备.我有必要检查一下,因为它会导致丢失一些起始数据包.
谢谢.
我想在Kafka中加载一个简单的文本文件而不是标准输入.下载Kafka后,我执行了以下步骤:
动物园管理员:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动服务器
bin/kafka-server-start.sh config/server.properties
创建了一个名为"test"的主题:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
跑到制片人:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Test1
Test2
Run Code Online (Sandbox Code Playgroud)
听取消费者的意见:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Test1
Test2
Run Code Online (Sandbox Code Playgroud)
我希望将数据文件甚至简单的文本文件传递给Producer,而不是标准输入,消费者可以直接看到它.真的很感激任何帮助.谢谢!
在卡夫卡创建主题的最佳途径是什么?
在新的生产者API中,当我尝试将消息发布到不存在的主题时,它首次失败然后成功发布.
我正在寻找关于这个问题的一些澄清.在Kafka文档中,我发现了以下内容:
Kafka仅对分区内的消息提供总订单,而不是在主题中的不同分区之间.对于大多数应用程序而言,按分区排序与按键分区数据的能力相结合就足够了.但是,如果您需要对邮件进行总订单,则可以使用仅包含一个分区的主题来实现,但这意味着每个使用者组只有一个使用者进程.
所以这是我的问题:
这是否意味着如果我想拥有超过1个消费者(来自同一组)从一个主题中读取我需要超过1个分区?
这是否意味着我需要相同数量的分区作为同一组的消费者数量?
有多少消费者可以从一个分区读取?
关于API的关键和分区之间的关系也有一些问题.我只看了.net API(特别是来自MS的API),但看起来像模仿Java API.我看到当使用生产者向主题发送消息时,有一个关键参数.但是,当消费者从主题中读取时,存在分区号.
提前致谢.
我正在调查Kafka 9作为业余爱好项目,并完成了一些"Hello World"类型示例.
我必须考虑基于请求响应消息传递的Real World Kafka应用程序,更具体地说,如何将Kafka请求消息链接到其响应消息.
我正在考虑使用生成的UUID作为请求消息密钥,并将此请求UUID用作关联的响应消息密钥.与WebSphere MQ具有消息关联ID的机制大致相同.
我的结束2结束过程将是.
1).Kafka客户端生成随机UUID并发送单个Kafka请求消息.2).服务器将使用此请求消息提取并存储请求UUID值3).使用消息有效内容完成业务流程.4).响应响应消息,该消息使用来自请求消息的存储的UUID值作为响应消息Key.5).Kafka客户端轮询响应主题,直到它超时或检索具有原始请求UUID值的消息.
我关注的是Kafka Consumer轮询将从响应主题中删除其他客户端消息,并增加偏移量,使其他客户端失败.
我是否尝试在一个用例中应用Kafka它从未设计过?
是否可以在Kafka中实现请求/响应消息传递?
该KafkaProducer发送方法都返回一个Future并接受回调.
使用一种机制而不是另一种机制在完成发送后执行操作之间是否有任何根本区别?
我有一个用例,我需要使用 Kafka Console Producer 发送键值消息。那么如何通过Kafka Console Producer命令来实现呢?
我正在使用Kafka生产者客户端,我的项目中没有任何log4j配置.
在运行时,程序会打印很多我真正不想要的Kafka Debug日志.
所以,我尝试添加一个log4j.properties来将日志级别设置为ERROR,如下所示,这似乎不起作用:
log4j.rootLogger=ERROR
Run Code Online (Sandbox Code Playgroud)
我如何更改Kafka日志级别?
我必须在 kafka 中使用 SSL 添加加密和身份验证。
这就是我所做的:
为每个broker kafka生成证书:
keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey
创建 CA。生成的 CA 是一个公私钥对和用于签署其他证书的证书。CA 负责签署证书。
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
使用生成的 CA 签署所有代理证书 从密钥库导出证书:
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
用 CA 签名:
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
将 CA 的证书和签名的证书都导入密钥库:
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost …