Sli*_*AIZ 1 scala avro apache-spark spark-dataframe
我从Avaf(序列化器和反序列化器)获得了kafka主题的推文.然后我创建了一个Spark消费者,它在RDD [GenericRecord]的Dstream中提取推文.现在我想将每个rdd转换为数据帧,以通过SQL分析这些推文.有什么解决方案可以将RDD [GenericRecord]转换为数据帧吗?
我花了一些时间来尝试这项工作(特别是如何正确地反序列化数据,但看起来你已经覆盖了这个)...更新
//Define function to convert from GenericRecord to Row
def genericRecordToRow(record: GenericRecord, sqlType : SchemaConverters.SchemaType): Row = {
val objectArray = new Array[Any](record.asInstanceOf[GenericRecord].getSchema.getFields.size)
import scala.collection.JavaConversions._
for (field <- record.getSchema.getFields) {
objectArray(field.pos) = record.get(field.pos)
}
new GenericRowWithSchema(objectArray, sqlType.dataType.asInstanceOf[StructType])
}
//Inside your stream foreachRDD
val yourGenericRecordRDD = ...
val schema = new Schema.Parser().parse(...) // your schema
val sqlType = SchemaConverters.toSqlType(new Schema.Parser().parse(strSchema))
var rowRDD = yourGeneircRecordRDD.map(record => genericRecordToRow(record, sqlType))
val df = sqlContext.createDataFrame(rowRDD , sqlType.dataType.asInstanceOf[StructType])
Run Code Online (Sandbox Code Playgroud)
如您所见,我使用SchemaConverter从您用于反序列化的模式中获取数据帧结构(这可能会对模式注册表更加痛苦).为此,您需要以下依赖项
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>3.2.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
你需要根据自己的需要改变你的火花版本.
更新:上面的代码仅适用于平面 avro架构.
对于嵌套结构,我使用了不同的东西.你可以复制SchemaConverters类,它必须在里面com.databricks.spark.avro(它使用databricks包中的一些受保护的类),或者你可以尝试使用spark-bigquery依赖项.默认情况下,该类不可访问,因此您需要在包内创建一个类com.databricks.spark.avro来访问工厂方法.
package com.databricks.spark.avro
import com.databricks.spark.avro.SchemaConverters.createConverterToSQL
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType
class SchemaConverterUtils {
def converterSql(schema : Schema, sqlType : StructType) = {
createConverterToSQL(schema, sqlType)
}
}
Run Code Online (Sandbox Code Playgroud)
之后你应该能够像这样转换数据
val schema = .. // your schema
val sqlType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
....
//inside foreach RDD
var genericRecordRDD = deserializeAvroData(rdd)
///
var converter = SchemaConverterUtils.converterSql(schema, sqlType)
...
val rowRdd = genericRecordRDD.flatMap(record => {
Try(converter(record).asInstanceOf[Row]).toOption
})
//To DataFrame
val df = sqlContext.createDataFrame(rowRdd, sqlType)
Run Code Online (Sandbox Code Playgroud)