相关疑难解决方法(0)

使用模式将带有Spark的AVRO消息转换为DataFrame

有没有使用模式转换方式从消息?用户记录的模式文件:

{
  "fields": [
    { "name": "firstName", "type": "string" },
    { "name": "lastName", "type": "string" }
  ],
  "name": "user",
  "type": "record"
}
Run Code Online (Sandbox Code Playgroud)

来自SqlNetworkWordCount示例Kafka,Spark和Avro的代码片段- 第3部分,生成和使用Avro消息来读取消息.

object Injection {
  val parser = new Schema.Parser()
  val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
  val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}

...

messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  import sqlContext.implicits._

  val df = rdd.map(message => Injection.injection.invert(message._2).get)
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()

  df.show() …
Run Code Online (Sandbox Code Playgroud)

scala avro apache-kafka apache-spark spark-streaming

13
推荐指数
1
解决办法
1万
查看次数