Spark Streaming中的Kafka消费者

Dil*_*eam 5 java apache-kafka apache-spark spark-streaming apache-zookeeper

尝试编写消耗来自Kafka的消息的Spark Streaming作业.这是我到目前为止的情况:

1)启动Zookeeper.
2)启动Kafka Server.
3)向服务器发送了一些消息.当我运行以下内容时,我可以看到它们:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 - topic mytopic --from-beginning

4)现在尝试编写一个程序来计算在5分钟内进入的消息数量.

代码看起来像这样:

    Map<String, Integer> map = new HashMap<String, Integer>();
    map.put("mytopic", new Integer(1));

    JavaStreamingContext ssc = new JavaStreamingContext(
            sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});


    JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);
Run Code Online (Sandbox Code Playgroud)

不确定第3个参数(使用者组)使用什么值.当我运行它时,我得到"无法连接到zookeeper服务器".但Zookeeper正在2181端口上运行; 否则步骤#3就行不通了.

好像我没有正确使用KafkaUtils.createStream.有任何想法吗?

Den*_*nko 2

不存在默认消费者组这样的东西。您可以在那里使用任意非空字符串。如果你只有一个消费者,那么它的消费者群体并不重要。如果有两个或多个消费者,他们可以属于同一消费者组,也可以属于不同的消费者组。

来自http://kafka.apache.org/documentation.html

消费者

...

如果所有消费者实例都有相同的消费者组,那么这就像传统的队列在消费者上平衡负载一样工作。

如果所有消费者实例都有不同的消费者组,那么这就像发布-订阅一样,所有消息都会广播给所有消费者。

我认为问题可能出在“主题”参数中。来自Spark 文档

要使用的 (topic_name -> numPartitions) 的映射。每个分区都在自己的线程中消耗

您仅为主题指定了一个分区,即“1”。根据代理的设置(num.partitions),可能有多个分区,并且您的消息可能会发送到您的程序无法读取的其他分区。

此外,我相信part​​itionIds是从0开始的。因此,如果您只有一个分区,则它的 id 等于 0。