从kafka消息中获取主题

Aru*_*air 5 java cassandra apache-kafka apache-spark

如何从kafka中的消息中识别主题名称.

String[] topics = { "test", "test1", "test2" };
    for (String t : topics) {
        topicMap.put(t, new Integer(3));
    }

SparkConf conf = new SparkConf().setAppName("KafkaReceiver")
            .set("spark.streaming.receiver.writeAheadLog.enable", "false")
            .setMaster("local[4]")
            .set("spark.cassandra.connection.host", "localhost");
    ;
    final JavaSparkContext sc = new JavaSparkContext(conf);
    JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(
            1000));

    /* Receive Kafka streaming inputs */
    JavaPairReceiverInputDStream<String, String> messages = KafkaUtils
            .createStream(jssc, "localhost:2181", "test-group",
                    topicMap);

   JavaDStream<MessageAndMetadata> data = 
          messages.map(new Function<Tuple2<String, String>, MessageAndMetadata>() 
          {

              public MessageAndMetadata call(Tuple2<String, String> message)
              {
                  System.out.println("message ="+message._2);
                  return null;
              }
          }

          );
Run Code Online (Sandbox Code Playgroud)

我可以从kafka制作人那里获取消息.但由于消费者现在正在消费三个主题,因此需要确定主题名称.

Pie*_*age 0

不幸的是,这并不简单,因为 Spark 源代码中的 KafkaReceiver 和 ReliableKafkaReceiver 仅存储 MessageAndMetadata.key 和消息。

\n\n

Spark 的 JIRA 中有两个与此问题相关的未处理票证:

\n\n\n\n

已经打开了一段时间了。

\n\n

A\xc2\xa0dirty 复制/粘贴/修改 Spark 的源代码来解决您的问题:

\n\n
package org.apache.spark.streaming.kafka\n\nimport java.lang.{Integer => JInt}\nimport java.util.{Map => JMap, Properties}\n\nimport kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, ConsumerConnector}\nimport kafka.serializer.{Decoder, StringDecoder}\nimport kafka.utils.VerifiableProperties\nimport org.apache.spark.Logging\nimport org.apache.spark.storage.StorageLevel\nimport org.apache.spark.streaming.StreamingContext\nimport org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}\nimport org.apache.spark.streaming.dstream.ReceiverInputDStream\nimport org.apache.spark.streaming.receiver.Receiver\nimport org.apache.spark.streaming.util.WriteAheadLogUtils\nimport org.apache.spark.util.ThreadUtils\nimport scala.collection.JavaConverters._\nimport scala.collection.Map\nimport scala.reflect._\n\nobject MoreKafkaUtils {\n\n  def createStream(\n    jssc: JavaStreamingContext,\n    zkQuorum: String,\n    groupId: String,\n    topics: JMap[String, JInt],\n    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2\n  ): JavaReceiverInputDStream[(String, String, String)] = {\n    val kafkaParams = Map[String, String](\n      "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,\n      "zookeeper.connection.timeout.ms" -> "10000")\n    val walEnabled = WriteAheadLogUtils.enableReceiverLog(jssc.ssc.conf)\n    new KafkaInputDStreamWithTopic[String, String, StringDecoder, StringDecoder](jssc.ssc, kafkaParams, topics.asScala.mapValues(_.intValue()), walEnabled, storageLevel)\n  }\n\n}\n\nprivate[streaming]\nclass KafkaInputDStreamWithTopic[\n  K: ClassTag,\n  V: ClassTag,\n  U <: Decoder[_] : ClassTag,\n  T <: Decoder[_] : ClassTag](\n    @transient ssc_ : StreamingContext,\n    kafkaParams: Map[String, String],\n    topics: Map[String, Int],\n    useReliableReceiver: Boolean,\n    storageLevel: StorageLevel\n  ) extends ReceiverInputDStream[(K, V, String)](ssc_) with Logging {\n\n  def getReceiver(): Receiver[(K, V, String)] = {\n    if (!useReliableReceiver) {\n      new KafkaReceiverWithTopic[K, V, U, T](kafkaParams, topics, storageLevel)\n    } else {\n      new ReliableKafkaReceiverWithTopic[K, V, U, T](kafkaParams, topics, storageLevel)\n    }\n  }\n}\n\nprivate[streaming]\nclass KafkaReceiverWithTopic[\n  K: ClassTag,\n  V: ClassTag,\n  U <: Decoder[_] : ClassTag,\n  T <: Decoder[_] : ClassTag](\n    kafkaParams: Map[String, String],\n    topics: Map[String, Int],\n    storageLevel: StorageLevel\n  ) extends Receiver[(K, V, String)](storageLevel) with Logging {\n\n  // Connection to Kafka\n  var consumerConnector: ConsumerConnector = null\n\n  def onStop() {\n    if (consumerConnector != null) {\n      consumerConnector.shutdown()\n      consumerConnector = null\n    }\n  }\n\n  def onStart() {\n\n    logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))\n\n    // Kafka connection properties\n    val props = new Properties()\n    kafkaParams.foreach(param => props.put(param._1, param._2))\n\n    val zkConnect = kafkaParams("zookeeper.connect")\n    // Create the connection to the cluster\n    logInfo("Connecting to Zookeeper: " + zkConnect)\n    val consumerConfig = new ConsumerConfig(props)\n    consumerConnector = Consumer.create(consumerConfig)\n    logInfo("Connected to " + zkConnect)\n\n    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])\n      .newInstance(consumerConfig.props)\n      .asInstanceOf[Decoder[K]]\n    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])\n      .newInstance(consumerConfig.props)\n      .asInstanceOf[Decoder[V]]\n\n    // Create threads for each topic/message Stream we are listening\n    val topicMessageStreams = consumerConnector.createMessageStreams(\n      topics, keyDecoder, valueDecoder)\n\n    val executorPool =\n      ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")\n    try {\n      // Start the messages handler for each partition\n      topicMessageStreams.values.foreach { streams =>\n        streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }\n      }\n    } finally {\n      executorPool.shutdown() // Just causes threads to terminate after work is done\n    }\n  }\n\n  // Handles Kafka messages\n  private class MessageHandler(stream: KafkaStream[K, V])\n    extends Runnable {\n    def run() {\n      logInfo("Starting MessageHandler.")\n      try {\n        val streamIterator = stream.iterator()\n        while (streamIterator.hasNext()) {\n          val msgAndMetadata = streamIterator.next()\n          store((msgAndMetadata.key, msgAndMetadata.message, msgAndMetadata.topic))\n        }\n      } catch {\n        case e: Throwable => reportError("Error handling message; exiting", e)\n      }\n    }\n  }\n\n}\n
Run Code Online (Sandbox Code Playgroud)\n