如何将行映射到 protobuf 生成的类?

Apu*_*rva 6 protocol-buffers apache-spark apache-spark-sql apache-spark-encoders

我需要编写一个读取 DataSet[Row] 并将其转换为 DataSet[CustomClass] 的作业,其中 CustomClass 是一个 protobuf 类。

val protoEncoder = Encoders.bean(classOf[CustomClass])
val transformedRows = rows.map {
  case Row(f1: String, f2: Long ) => {
  val pbufClass = CustomClass.newBuilder()
                             .setF1(f1)
                             .setF2(f2)
  pbufClass.build()}}(protoEncoder)
Run Code Online (Sandbox Code Playgroud)

但是,看起来 Protobuf 类并不是真正的 Java Bean,我确实在以下方面获得了 NPE

val x =  Encoders.bean(classOf[CustomClass])
Run Code Online (Sandbox Code Playgroud)

如何确保作业可以发出 DataSet[CustomClass] 类型的数据集,其中 CustomClass 是 protobuf 类。关于为类编写自定义编码器的任何指针/示例?

NPE:

val encoder2 = Encoders.bean(classOf[CustomClass])
java.lang.NullPointerException
  at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465)
  at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126)
  at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:125)
  at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:89)
  at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
  ... 48 elided
Run Code Online (Sandbox Code Playgroud)

Bean 编码器内部使用

JavaTypeInference.serializerFor(protoClass)
Run Code Online (Sandbox Code Playgroud)

如果我尝试在自定义编码器中执行相同操作,则会收到更具描述性的错误消息:

Caused by: java.lang.UnsupportedOperationException: Cannot infer type for class xxx.yyy.CustomClass because it is not bean-compliant
        at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:430)
        at org.apache.spark.sql.catalyst.JavaTypeInference$.serializerFor(JavaTypeInference.scala:337)
        at xxx.yyy..EncoderHolder$.protoEncoder(xxx.scala:69)
        at xxx.yyy..EncoderHolder$.encoder$lzycompute$1(xxx.scala:82)
        at xxx.yyy..EncoderHolder$.encoder$1(xxx.scala:82)
        at xxx.yyy..EncoderHolder$.liftedTree1$1(xxx.scala:84)
        at xxx.yyy..EncoderHolder$.<init>(xxx.scala:81)
        at xxx.yyy..EncoderHolder$.<clinit>(xxx.scala)
Run Code Online (Sandbox Code Playgroud)

Sha*_*ala 2

要将 Row 转换为 Protobuf 类,您可以使用SparkSQL-protobuf

该库提供了在 SparkSQL 中使用 Protobuf 对象的实用程序。它提供了一种将 SparkSQL 写入的 parquet 文件读回作为兼容 protobuf 对象的 RDD 的方法。它还可以将protobuf对象的RDD转换为DataFrame。

build.sbt向您的文件添加依赖项

resolvers += Resolver.jcenterRepo

libraryDependencies ++= Seq(
    "com.github.saurfang" %% "sparksql-protobuf" % "0.1.2",
    "org.apache.parquet" % "parquet-protobuf" % "1.8.1"
Run Code Online (Sandbox Code Playgroud)

您可以按照库中的一些示例开始

实施例1

实施例2

我希望这有帮助!