5 scala apache-kafka apache-spark
我正在使用Spark 2.0.2,使用Kafka 0.11.0,我正在尝试使用来自kafka的消息来处理spark流.以下是代码:
val topics = "notes"
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:7092",
"schema.registry.url" -> "http://localhost:7070",
"group.id" -> "connect-cluster1",
"value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
"key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer"
)
val topicSet: Set[String] = Set(topics)
val stream = KafkaUtils.createDirectStream[String, String](
SparkStream.ssc,
PreferConsistent,
Subscribe[String, String](topicSet, kafkaParams)
)
stream.foreachRDD ( rdd => {
rdd.foreachPartition(iterator => {
while (iterator.hasNext) {
val next = iterator.next()
println(next.value())
}
})
})
Run Code Online (Sandbox Code Playgroud)
如果Kafka消息包含记录,则输出将为:
{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee", "createdat": 1505312886984, "createdby": "karthik", "notes": "testing20"}
{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee", "createdat": 1505312890472, "createdby": "karthik", "notes": "testing21"}
Run Code Online (Sandbox Code Playgroud)
因此,从consumerRecord的值可以看出,收到的消息是Avro解码的.现在我需要数据帧格式的那些记录,但我不知道如何从这里开始,即使手头的模式如下:
val sr : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070", 1000)
val m = sr.getLatestSchemaMetadata(topics + "-value")
val schemaId = m.getId
val schemaString = m.getSchema
val schemaRegistry : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070", 1000)
val decoder: KafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry)
val parser = new Schema.Parser()
val avroSchema = parser.parse(schemaString)
println(avroSchema)
Run Code Online (Sandbox Code Playgroud)
模式打印如下:
{"type":"record","name":"notes","namespace":"db","fields":[{"name":"id","type":["null","string"],"default":null},{"name":"createdat","type":["null",{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}],"default":null},{"name":"createdby","type":["null","string"],"default":null},{"name":"notes","type":["null","string"],"default":null}],"connect.name":"db.notes"}
任何人都可以帮助我了解如何从消费者记录的价值中获取数据框架吗?我已经看过了,如其他问题使用模式来AVRO消息转换星火到数据帧,运行星火流媒体应用程序中处理架构更改,但在firstplace他们不与consumerRecord处理.
我自己是 scala\kafka\spark 的新手,所以我不确定这是否能准确回答问题,但它会对我有所帮助。我确信有比这更好的方法,所以希望有更多经验的人能够提供更好的答案。
// KafkaRDD
stream.foreachRDD { rdd => {
// pull the values I'm looking for into a string array
var x = rdd.map(row => row.value()).collect()
// convert to dataframe
val df = spark.createDataFrame(x).toDF("record")
// write data frame to datastore (MySQL in my case)
df.write
.mode(SaveMode.Append)
.jdbc(url, table, props)
}
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1813 次 |
最近记录: |