使用嵌套的用户数据类型保存Spark DataFrames

Joã*_*rte 6 apache-spark apache-spark-sql

我想保存(作为镶木地板文件)包含自定义类作为列的Spark DataFrame.该类由另一个自定义类的Seq组成.为此,我以与VectorUDT类似的方式为每个类创建一个UserDefinedType类.我可以按照我的意图使用数据框,但不能将它作为镶木地板(或jason)保存到磁盘我将其报告为错误,但可能我的代码存在问题.我已经实现了一个更简单的示例来显示问题:

import org.apache.spark.sql.SaveMode
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.types._

@SQLUserDefinedType(udt = classOf[AUDT])
case class A(list:Seq[B])

class AUDT extends UserDefinedType[A] {
  override def sqlType: DataType = StructType(Seq(StructField("list", ArrayType(BUDT, containsNull = false), nullable = true)))
  override def userClass: Class[A] = classOf[A]
  override def serialize(obj: Any): Any = obj match {
    case A(list) =>
      val row = new GenericMutableRow(1)
      row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray))
      row
  }

  override def deserialize(datum: Any): A = {
    datum match {
      case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq)
    }
  }
}

object AUDT extends AUDT

@SQLUserDefinedType(udt = classOf[BUDT])
case class B(num:Int)

class BUDT extends UserDefinedType[B] {
  override def sqlType: DataType = StructType(Seq(StructField("num", IntegerType, nullable = false)))
  override def userClass: Class[B] = classOf[B]
  override def serialize(obj: Any): Any = obj match {
    case B(num) =>
      val row = new GenericMutableRow(1)
      row.setInt(0, num)
      row
  }

  override def deserialize(datum: Any): B = {
    datum match {
      case row: InternalRow => new B(row.getInt(0))
    }
  }
}

object BUDT extends BUDT

object TestNested {
  def main(args:Array[String]) = {
    val col = Seq(new A(Seq(new B(1), new B(2))),
                  new A(Seq(new B(3), new B(4))))

    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("TestSpark"))
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    val df = sc.parallelize(1 to 2 zip col).toDF()
    df.show()

    df.write.mode(SaveMode.Overwrite).save(...)
  }
}
Run Code Online (Sandbox Code Playgroud)

这会导致以下错误:

15/09/16 16:44:39错误执行程序:阶段1.0中的任务0.0中的异常(TID 1)java.lang.IllegalArgumentException:应重复嵌套类型:必需的组数组{required int32 num; } org.apache.parquet.schema.ConversionPatterns.listWrapper(ConversionPatterns.java:42)位于org.apache.spark.sql.exe的org.apache.parquet.schema.ConversionPatterns.listType(ConversionPatterns.java:97).位于org.apache.spark.sql.execution的org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318)中的datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:460). datasources.parquet.CatalystSchemaConverter $$ anonfun $ convertField $ 1.适用(CatalystSchemaConverter.scala:522)在org.apache.spark.sql.execution.datasources.parquet.

如果使用B而不是A保存数据框,则存在没有问题,因为B没有嵌套的自定义类.我错过了什么吗?

Spi*_*lov 4

我必须对您的代码进行四处更改才能使其工作(在 Linux 上的 Spark 1.6.0 中进行测试),我想我基本上可以解释为什么需要它们。然而,我确实想知道是否有更简单的解决方案。所有更改均在 中AUDT,如下:

  1. 定义时sqlType,使其依赖于BUDT.sqlType,而不仅仅是BUDT
  2. 在 中serialize(),调用BUDT.serialize()每个列表元素。
  3. deserialize()
    • 调用toArray(BUDT.sqlType)而不是toArray(BUDT)
    • 调用BUDT.deserialize()每个元素

这是生成的代码:

class AUDT extends UserDefinedType[A] {
  override def sqlType: DataType =
    StructType(
      Seq(StructField("list",
                      ArrayType(BUDT.sqlType, containsNull = false),
                      nullable = true)))

  override def userClass: Class[A] = classOf[A]

  override def serialize(obj: Any): Any = 
    obj match {
      case A(list) =>
        val row = new GenericMutableRow(1)
        val elements =
          list.map(_.asInstanceOf[Any])
              .map(e => BUDT.serialize(e))
              .toArray
        row.update(0, new GenericArrayData(elements))
        row
    }

  override def deserialize(datum: Any): A = {
    datum match {
      case row: InternalRow => 
        val first = row.getArray(0)
        val bs:Array[InternalRow] = first.toArray(BUDT.sqlType)
        val bseq = bs.toSeq.map(e => BUDT.deserialize(e))
        val a = new A(bseq)
        a
    }
  }

}
Run Code Online (Sandbox Code Playgroud)

A所有四个更改都具有相同的特征: s 的处理和 s 的处理之间的关系B现在非常明确:对于模式类型、序列化和反序列化。原始代码似乎基于 Spark SQL 会“弄清楚”的假设,这可能是合理的,但显然事实并非如此。