Aru*_*air 5 java cassandra apache-kafka apache-spark
如何从kafka中的消息中识别主题名称.
String[] topics = { "test", "test1", "test2" };
for (String t : topics) {
topicMap.put(t, new Integer(3));
}
SparkConf conf = new SparkConf().setAppName("KafkaReceiver")
.set("spark.streaming.receiver.writeAheadLog.enable", "false")
.setMaster("local[4]")
.set("spark.cassandra.connection.host", "localhost");
;
final JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(
1000));
/* Receive Kafka streaming inputs */
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils
.createStream(jssc, "localhost:2181", "test-group",
topicMap);
JavaDStream<MessageAndMetadata> data =
messages.map(new Function<Tuple2<String, String>, MessageAndMetadata>()
{
public MessageAndMetadata call(Tuple2<String, String> message)
{
System.out.println("message ="+message._2);
return null;
}
}
);
Run Code Online (Sandbox Code Playgroud)
我可以从kafka制作人那里获取消息.但由于消费者现在正在消费三个主题,因此需要确定主题名称.
不幸的是,这并不简单,因为 Spark 源代码中的 KafkaReceiver 和 ReliableKafkaReceiver 仅存储 MessageAndMetadata.key 和消息。
\n\nSpark 的 JIRA 中有两个与此问题相关的未处理票证:
\n\n已经打开了一段时间了。
\n\nA\xc2\xa0dirty 复制/粘贴/修改 Spark 的源代码来解决您的问题:
\n\npackage org.apache.spark.streaming.kafka\n\nimport java.lang.{Integer => JInt}\nimport java.util.{Map => JMap, Properties}\n\nimport kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, ConsumerConnector}\nimport kafka.serializer.{Decoder, StringDecoder}\nimport kafka.utils.VerifiableProperties\nimport org.apache.spark.Logging\nimport org.apache.spark.storage.StorageLevel\nimport org.apache.spark.streaming.StreamingContext\nimport org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}\nimport org.apache.spark.streaming.dstream.ReceiverInputDStream\nimport org.apache.spark.streaming.receiver.Receiver\nimport org.apache.spark.streaming.util.WriteAheadLogUtils\nimport org.apache.spark.util.ThreadUtils\nimport scala.collection.JavaConverters._\nimport scala.collection.Map\nimport scala.reflect._\n\nobject MoreKafkaUtils {\n\n def createStream(\n jssc: JavaStreamingContext,\n zkQuorum: String,\n groupId: String,\n topics: JMap[String, JInt],\n storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2\n ): JavaReceiverInputDStream[(String, String, String)] = {\n val kafkaParams = Map[String, String](\n "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,\n "zookeeper.connection.timeout.ms" -> "10000")\n val walEnabled = WriteAheadLogUtils.enableReceiverLog(jssc.ssc.conf)\n new KafkaInputDStreamWithTopic[String, String, StringDecoder, StringDecoder](jssc.ssc, kafkaParams, topics.asScala.mapValues(_.intValue()), walEnabled, storageLevel)\n }\n\n}\n\nprivate[streaming]\nclass KafkaInputDStreamWithTopic[\n K: ClassTag,\n V: ClassTag,\n U <: Decoder[_] : ClassTag,\n T <: Decoder[_] : ClassTag](\n @transient ssc_ : StreamingContext,\n kafkaParams: Map[String, String],\n topics: Map[String, Int],\n useReliableReceiver: Boolean,\n storageLevel: StorageLevel\n ) extends ReceiverInputDStream[(K, V, String)](ssc_) with Logging {\n\n def getReceiver(): Receiver[(K, V, String)] = {\n if (!useReliableReceiver) {\n new KafkaReceiverWithTopic[K, V, U, T](kafkaParams, topics, storageLevel)\n } else {\n new ReliableKafkaReceiverWithTopic[K, V, U, T](kafkaParams, topics, storageLevel)\n }\n }\n}\n\nprivate[streaming]\nclass KafkaReceiverWithTopic[\n K: ClassTag,\n V: ClassTag,\n U <: Decoder[_] : ClassTag,\n T <: Decoder[_] : ClassTag](\n kafkaParams: Map[String, String],\n topics: Map[String, Int],\n storageLevel: StorageLevel\n ) extends Receiver[(K, V, String)](storageLevel) with Logging {\n\n // Connection to Kafka\n var consumerConnector: ConsumerConnector = null\n\n def onStop() {\n if (consumerConnector != null) {\n consumerConnector.shutdown()\n consumerConnector = null\n }\n }\n\n def onStart() {\n\n logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))\n\n // Kafka connection properties\n val props = new Properties()\n kafkaParams.foreach(param => props.put(param._1, param._2))\n\n val zkConnect = kafkaParams("zookeeper.connect")\n // Create the connection to the cluster\n logInfo("Connecting to Zookeeper: " + zkConnect)\n val consumerConfig = new ConsumerConfig(props)\n consumerConnector = Consumer.create(consumerConfig)\n logInfo("Connected to " + zkConnect)\n\n val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])\n .newInstance(consumerConfig.props)\n .asInstanceOf[Decoder[K]]\n val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])\n .newInstance(consumerConfig.props)\n .asInstanceOf[Decoder[V]]\n\n // Create threads for each topic/message Stream we are listening\n val topicMessageStreams = consumerConnector.createMessageStreams(\n topics, keyDecoder, valueDecoder)\n\n val executorPool =\n ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")\n try {\n // Start the messages handler for each partition\n topicMessageStreams.values.foreach { streams =>\n streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }\n }\n } finally {\n executorPool.shutdown() // Just causes threads to terminate after work is done\n }\n }\n\n // Handles Kafka messages\n private class MessageHandler(stream: KafkaStream[K, V])\n extends Runnable {\n def run() {\n logInfo("Starting MessageHandler.")\n try {\n val streamIterator = stream.iterator()\n while (streamIterator.hasNext()) {\n val msgAndMetadata = streamIterator.next()\n store((msgAndMetadata.key, msgAndMetadata.message, msgAndMetadata.topic))\n }\n } catch {\n case e: Throwable => reportError("Error handling message; exiting", e)\n }\n }\n }\n\n}\n
Run Code Online (Sandbox Code Playgroud)\n
归档时间: |
|
查看次数: |
2401 次 |
最近记录: |