如何将RDD [GenericRecord]转换为scala中的dataframe?

Sli*_*AIZ 1 scala avro apache-spark spark-dataframe

我从Avaf(序列化器和反序列化器)获得了kafka主题的推文.然后我创建了一个Spark消费者,它在RDD [GenericRecord]的Dstream中提取推文.现在我想将每个rdd转换为数据帧,以通过SQL分析这些推文.有什么解决方案可以将RDD [GenericRecord]转换为数据帧吗?

hla*_*gos 7

我花了一些时间来尝试这项工作(特别是如何正确地反序列化数据,但看起来你已经覆盖了这个)...更新

  //Define function to convert from GenericRecord to Row
  def genericRecordToRow(record: GenericRecord, sqlType : SchemaConverters.SchemaType): Row = {
    val objectArray = new Array[Any](record.asInstanceOf[GenericRecord].getSchema.getFields.size)
    import scala.collection.JavaConversions._
    for (field <- record.getSchema.getFields) {
      objectArray(field.pos) = record.get(field.pos)
    }

    new GenericRowWithSchema(objectArray, sqlType.dataType.asInstanceOf[StructType])
  }

//Inside your stream foreachRDD
val yourGenericRecordRDD = ... 
val schema = new Schema.Parser().parse(...) // your schema
val sqlType = SchemaConverters.toSqlType(new Schema.Parser().parse(strSchema))

var rowRDD = yourGeneircRecordRDD.map(record => genericRecordToRow(record, sqlType))
val df = sqlContext.createDataFrame(rowRDD , sqlType.dataType.asInstanceOf[StructType])
Run Code Online (Sandbox Code Playgroud)

如您所见,我使用SchemaConverter从您用于反序列化的模式中获取数据帧结构(这可能会对模式注册表更加痛苦).为此,您需要以下依赖项

    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.11</artifactId>
        <version>3.2.0</version>
    </dependency>
Run Code Online (Sandbox Code Playgroud)

你需要根据自己的需要改变你的火花版本.

更新:上面的代码仅适用于平面 avro架构.

对于嵌套结构,我使用了不同的东西.你可以复制SchemaConverters类,它必须在里面com.databricks.spark.avro(它使用databricks包中的一些受保护的类),或者你可以尝试使用spark-bigquery依赖项.默认情况下,该类不可访问,因此您需要在包内创建一个类com.databricks.spark.avro来访问工厂方法.

package com.databricks.spark.avro

import com.databricks.spark.avro.SchemaConverters.createConverterToSQL
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType

class SchemaConverterUtils {

  def converterSql(schema : Schema, sqlType : StructType) = {
    createConverterToSQL(schema, sqlType)
  }

}
Run Code Online (Sandbox Code Playgroud)

之后你应该能够像这样转换数据

val schema = .. // your schema
val sqlType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
....
//inside foreach RDD
var genericRecordRDD = deserializeAvroData(rdd)
/// 
var converter = SchemaConverterUtils.converterSql(schema, sqlType)
... 
val rowRdd = genericRecordRDD.flatMap(record => {
        Try(converter(record).asInstanceOf[Row]).toOption
      })
//To DataFrame
 val df = sqlContext.createDataFrame(rowRdd, sqlType)
Run Code Online (Sandbox Code Playgroud)