小编Apu*_*rva的帖子

如何将行映射到 protobuf 生成的类?

我需要编写一个读取 DataSet[Row] 并将其转换为 DataSet[CustomClass] 的作业,其中 CustomClass 是一个 protobuf 类。

val protoEncoder = Encoders.bean(classOf[CustomClass])
val transformedRows = rows.map {
  case Row(f1: String, f2: Long ) => {
  val pbufClass = CustomClass.newBuilder()
                             .setF1(f1)
                             .setF2(f2)
  pbufClass.build()}}(protoEncoder)
Run Code Online (Sandbox Code Playgroud)

但是,看起来 Protobuf 类并不是真正的 Java Bean,我确实在以下方面获得了 NPE

val x =  Encoders.bean(classOf[CustomClass])
Run Code Online (Sandbox Code Playgroud)

如何确保作业可以发出 DataSet[CustomClass] 类型的数据集,其中 CustomClass 是 protobuf 类。关于为类编写自定义编码器的任何指针/示例?

NPE:

val encoder2 = Encoders.bean(classOf[CustomClass])
java.lang.NullPointerException
  at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465)
  at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126)
  at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:125)
  at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:89) …
Run Code Online (Sandbox Code Playgroud)

protocol-buffers apache-spark apache-spark-sql apache-spark-encoders

6
推荐指数
1
解决办法
5864
查看次数

Apache Spark(结构化流):S3 Checkpoint支持

来自spark结构化流媒体文档:"此检查点位置必须是HDFS兼容文件系统中的路径,并且可以DataStreamWriter在启动查询时设置为选项."

当然,将检查点设置为s3路径会抛出:

17/01/31 21:23:56 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 
java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 
        at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:652) 
        at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194) 
        at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106) 
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) 
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) 
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) 
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301) 
        at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430) 
        at org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51) 
        at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100) 
        at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) 
        at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) 
        at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) 
        at com.roku.dea.spark.streaming.FactDeviceLogsProcessor$.main(FactDeviceLogsProcessor.scala:133) 
        at com.roku.dea.spark.streaming.FactDeviceLogsProcessor.main(FactDeviceLogsProcessor.scala) 
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
        at java.lang.reflect.Method.invoke(Method.java:498) 
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637) 
17/01/31 21:23:56 INFO SparkContext: Invoking stop() from shutdown hook 
Run Code Online (Sandbox Code Playgroud)

这里有几个问题:

  1. 为什么不支持s3作为检查点目录(常规火花流支持这个)?什么使文件系统"符合HDFS"?
  2. 我一直使用HDFS(因为集群可以一直上升或下降)并使用s3作为持久保存所有数据的地方 - 在这样的设置中存储结构化流数据的检查点数据的建议是什么?

apache-spark spark-structured-streaming

5
推荐指数
3
解决办法
2742
查看次数