Dil*_*eam 5 java apache-kafka apache-spark spark-streaming apache-zookeeper
尝试编写消耗来自Kafka的消息的Spark Streaming作业.这是我到目前为止的情况:
1)启动Zookeeper.
2)启动Kafka Server.
3)向服务器发送了一些消息.当我运行以下内容时,我可以看到它们:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 - topic mytopic --from-beginning
4)现在尝试编写一个程序来计算在5分钟内进入的消息数量.
代码看起来像这样:
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("mytopic", new Integer(1));
JavaStreamingContext ssc = new JavaStreamingContext(
sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});
JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);
Run Code Online (Sandbox Code Playgroud)
不确定第3个参数(使用者组)使用什么值.当我运行它时,我得到"无法连接到zookeeper服务器".但Zookeeper正在2181端口上运行; 否则步骤#3就行不通了.
好像我没有正确使用KafkaUtils.createStream.有任何想法吗?
不存在默认消费者组这样的东西。您可以在那里使用任意非空字符串。如果你只有一个消费者,那么它的消费者群体并不重要。如果有两个或多个消费者,他们可以属于同一消费者组,也可以属于不同的消费者组。
来自http://kafka.apache.org/documentation.html:
消费者
...
如果所有消费者实例都有相同的消费者组,那么这就像传统的队列在消费者上平衡负载一样工作。
如果所有消费者实例都有不同的消费者组,那么这就像发布-订阅一样,所有消息都会广播给所有消费者。
我认为问题可能出在“主题”参数中。来自Spark 文档:
要使用的 (topic_name -> numPartitions) 的映射。每个分区都在自己的线程中消耗
您仅为主题指定了一个分区,即“1”。根据代理的设置(num.partitions),可能有多个分区,并且您的消息可能会发送到您的程序无法读取的其他分区。
此外,我相信partitionIds是从0开始的。因此,如果您只有一个分区,则它的 id 等于 0。
| 归档时间: |
|
| 查看次数: |
10538 次 |
| 最近记录: |