shi*_*ali 5 apache-kafka spark-streaming spark-streaming-kafka
尝试从卡夫卡源读取。我想从收到的消息中提取时间戳以进行结构化火花流处理。kafka(版本 0.10.0.0) Spark Streaming(版本 2.0.1)
小智 1
我建议几件事:
假设您通过最新的Kafka Streaming Api (0.10 Kafka)创建一个流
例如,您使用依赖项:"org.apache.spark" %% "spark-streaming-kafka-0-10" % 2.0.1
根据上面的文档,您创建了一个流:
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "broker1:9092,broker2:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[ByteArrayDeserializer],
"group.id" -> "spark-streaming-test",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean))
val sparkConf = new SparkConf()
// suppose you have 60 second window
val ssc = new StreamingContext(sparkConf, Seconds(60))
ssc.checkpoint("checkpoint")
val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent,
Subscribe[String, Array[Byte]](topics, kafkaParams))
Run Code Online (Sandbox Code Playgroud)您的流将是ConsumerRecord[String,Array[Byte]]的 DStream ,您可以获得时间戳和键值,如下所示:
stream.map { record => (record.timestamp(), record.key(), record.value()) }
Run Code Online (Sandbox Code Playgroud)希望有帮助。
| 归档时间: |
|
| 查看次数: |
5010 次 |
| 最近记录: |