我正在探索设置kafka的几个选项,我知道Zookeeper必须启动并运行才能启动kafka.
我想知道如何找到以下内容.
1)我的zookeeper实例的主机名和端口---我检查了zoo.cfg,我只能找到ClientPort而不是主机名,hostname是我的盒子的主机名吗?
2)检查ZooKeeper是否正常运行---我试图做一个ps -ef | grep "zoo"我找不到的东西.可能是我用错了关键词来搜索?
任何帮助将非常感激?
我是 kafka 和 zookepper 的新手,我正在尝试创建一个主题,但我收到此错误 -
Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option
at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
at joptsimple.OptionParser.parse(OptionParser.java:396)
at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:517)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:47)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Run Code Online (Sandbox Code Playgroud)
我正在使用此命令来创建主题 -
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partions 1 --topic TestTopic
Run Code Online (Sandbox Code Playgroud) 所有的例子的卡夫卡 | 生产者显示ProducerRecord的键/值对不仅是相同的类型(所有示例都显示<String,String>),而是相同的值.例如:
producer.send(new ProducerRecord<String, String>("someTopic", Integer.toString(i), Integer.toString(i)));
Run Code Online (Sandbox Code Playgroud)
但是在Kafka文档中,我似乎无法找到解释键/值概念(及其基本目的/效用)的位置.在传统的消息传递(ActiveMQ,RabbitMQ等)中,我总是在特定的主题/队列/交换中发出消息.但Kafka是第一个似乎需要键/值对的代理,而不仅仅是一个正常的'字符串消息.
所以我问:要求生产者发送KV对的目的/用途是什么?
messaging message-queue key-value messagebroker apache-kafka
我正在学习卡夫卡,在这里阅读介绍部分
https://kafka.apache.org/documentation.html#introduction
特别是有关消费者的部分.在引言的倒数第二段中,它读到了
卡夫卡做得更好.通过在主题中具有并行性概念 - 分区 - ,Kafka能够在消费者流程池中提供订购保证和负载平衡.这是通过将主题中的分区分配给使用者组中的使用者来实现的,以便每个分区仅由该组中的一个使用者使用.通过这样做,我们确保使用者是该分区的唯一读者并按顺序使用数据.由于有许多分区,这仍然可以平衡许多消费者实例的负载.但请注意,除分区之外不能有更多的消费者实例.
我的困惑源于最后一句话,因为在该段落的正上方,作者描绘了两个消费者群体和一个4分区主题,消费者实例多于分区!
没有比分区更多的消费者实例也没有意义,因为那时分区将非常小,并且似乎为每个消费者实例创建新分区的开销会使Kafka陷入困境.我知道分区用于容错并减少任何一台服务器上的负载,但上述句子在分布式系统的环境中没有意义,该分布式系统应该能够一次处理数千个消费者.
我正在尝试学习消息传递系统.我发现RabbitMq和NServiceBus在很少的地方一起使用.我的问题是
我没有在kafka上工作过多,但想在GCE中构建数据管道.所以我们想知道Kafka vs PUB/Sub.基本上我想知道如何在Kafka和Pub/sub中维护消息一致性,消息可用性,消息可靠性
谢谢
使用Kafka 0.8.1.1,如何在运行时更改日志保留时间?该文件说,财产是log.retention.hours,而是试图用它来改变kafka-topics.sh返回此错误
$ bin/kafka-topics.sh --zookeeper zk.yoursite.com --alter --topic as-access --config topic.log.retention.hours=24
Error while executing topic command requirement failed: Unknown configuration "topic.log.retention.hours".
java.lang.IllegalArgumentException: requirement failed: Unknown configuration "topic.log.retention.hours".
at scala.Predef$.require(Predef.scala:145)
at kafka.log.LogConfig$$anonfun$validateNames$1.apply(LogConfig.scala:138)
at kafka.log.LogConfig$$anonfun$validateNames$1.apply(LogConfig.scala:137)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at scala.collection.JavaConversions$JEnumerationWrapper.foreach(JavaConversions.scala:479)
at kafka.log.LogConfig$.validateNames(LogConfig.scala:137)
at kafka.log.LogConfig$.validate(LogConfig.scala:145)
at kafka.admin.TopicCommand$.parseTopicConfigsToBeAdded(TopicCommand.scala:171)
at kafka.admin.TopicCommand$$anonfun$alterTopic$1.apply(TopicCommand.scala:95)
at kafka.admin.TopicCommand$$anonfun$alterTopic$1.apply(TopicCommand.scala:93)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:93)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:52)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Run Code Online (Sandbox Code Playgroud) 我们如何使用API从IDE在Kafka中创建主题,因为当我这样做时:
bin/kafka-create-topic.sh --topic mytopic --replica 3 --zookeeper localhost:2181
Run Code Online (Sandbox Code Playgroud)
我收到错误:
bash: bin/kafka-create-topic.sh: No such file or directory
Run Code Online (Sandbox Code Playgroud)
我按照原样关注了开发人员设置.
使用kubernetes-kafka作为minikube的起点.
这使用StatefulSet和无头服务在集群内进行服务发现.
目标是在外部公开个人Kafka经纪人,这些经纪人在内部被称为:
kafka-0.broker.kafka.svc.cluster.local:9092
kafka-1.broker.kafka.svc.cluster.local:9092
kafka-2.broker.kafka.svc.cluster.local:9092
Run Code Online (Sandbox Code Playgroud)
限制是该外部服务能够专门解决代理.
什么是正确的(或一种可能的)方式?是否可以公开外部服务kafka-x.broker.kafka.svc.cluster.local:9092?
apache-kafka ×10
config ×1
distributed ×1
hadoop ×1
java ×1
key-value ×1
kubernetes ×1
messaging ×1
nservicebus ×1
ps ×1
rabbitmq ×1
retention ×1