Apu*_*rva 6 protocol-buffers apache-spark apache-spark-sql apache-spark-encoders
我需要编写一个读取 DataSet[Row] 并将其转换为 DataSet[CustomClass] 的作业,其中 CustomClass 是一个 protobuf 类。
val protoEncoder = Encoders.bean(classOf[CustomClass])
val transformedRows = rows.map {
case Row(f1: String, f2: Long ) => {
val pbufClass = CustomClass.newBuilder()
.setF1(f1)
.setF2(f2)
pbufClass.build()}}(protoEncoder)
Run Code Online (Sandbox Code Playgroud)
但是,看起来 Protobuf 类并不是真正的 Java Bean,我确实在以下方面获得了 NPE
val x = Encoders.bean(classOf[CustomClass])
Run Code Online (Sandbox Code Playgroud)
如何确保作业可以发出 DataSet[CustomClass] 类型的数据集,其中 CustomClass 是 protobuf 类。关于为类编写自定义编码器的任何指针/示例?
NPE:
val encoder2 = Encoders.bean(classOf[CustomClass])
java.lang.NullPointerException
at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:125)
at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:89)
at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
... 48 elided
Run Code Online (Sandbox Code Playgroud)
Bean 编码器内部使用
JavaTypeInference.serializerFor(protoClass)
Run Code Online (Sandbox Code Playgroud)
如果我尝试在自定义编码器中执行相同操作,则会收到更具描述性的错误消息:
Caused by: java.lang.UnsupportedOperationException: Cannot infer type for class xxx.yyy.CustomClass because it is not bean-compliant
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:430)
at org.apache.spark.sql.catalyst.JavaTypeInference$.serializerFor(JavaTypeInference.scala:337)
at xxx.yyy..EncoderHolder$.protoEncoder(xxx.scala:69)
at xxx.yyy..EncoderHolder$.encoder$lzycompute$1(xxx.scala:82)
at xxx.yyy..EncoderHolder$.encoder$1(xxx.scala:82)
at xxx.yyy..EncoderHolder$.liftedTree1$1(xxx.scala:84)
at xxx.yyy..EncoderHolder$.<init>(xxx.scala:81)
at xxx.yyy..EncoderHolder$.<clinit>(xxx.scala)
Run Code Online (Sandbox Code Playgroud)
要将 Row 转换为 Protobuf 类,您可以使用SparkSQL-protobuf
该库提供了在 SparkSQL 中使用 Protobuf 对象的实用程序。它提供了一种将 SparkSQL 写入的 parquet 文件读回作为兼容 protobuf 对象的 RDD 的方法。它还可以将protobuf对象的RDD转换为DataFrame。
build.sbt向您的文件添加依赖项
resolvers += Resolver.jcenterRepo
libraryDependencies ++= Seq(
"com.github.saurfang" %% "sparksql-protobuf" % "0.1.2",
"org.apache.parquet" % "parquet-protobuf" % "1.8.1"
Run Code Online (Sandbox Code Playgroud)
)
您可以按照库中的一些示例开始
我希望这有帮助!
| 归档时间: |
|
| 查看次数: |
5864 次 |
| 最近记录: |