有没有使用模式转换方式的Avro从消息卡夫卡与火花到数据帧?用户记录的模式文件:
{
"fields": [
{ "name": "firstName", "type": "string" },
{ "name": "lastName", "type": "string" }
],
"name": "user",
"type": "record"
}
Run Code Online (Sandbox Code Playgroud)
来自SqlNetworkWordCount示例和Kafka,Spark和Avro的代码片段- 第3部分,生成和使用Avro消息来读取消息.
object Injection {
val parser = new Schema.Parser()
val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
...
messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()
df.show() …
Run Code Online (Sandbox Code Playgroud)