在 Spark 中启用目录提交者

Div*_* DD 5 amazon-s3 apache-spark

我正在尝试使用 S3A 分区(或目录,因为我只需要确认提交者是否按预期工作)提交者与 Spark。我正在关注这个链接,它应该很简单,但是我在解决上一个问题时遇到了新问题

用于测试的代码是(内部spark-shell):

val sourceDF = spark.range(0, 10000)
val datasets = "s3a://bucket-name/test"
sourceDF.write.format("orc").save(datasets + "orc")
Run Code Online (Sandbox Code Playgroud)

spark-defaults.conf 是:

spark.hadoop.fs.s3a.committer.name directory

spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol

spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter


Error 1:
scala> sourceDF.write.format("orc").save(datasets + "orc")
java.lang.NoClassDefFoundError: 
org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:230)
at org.apache.spark.internal.io.FileCommitProtocol$.instantiate(FileCommitProtocol.scala:144)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:98)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:435)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:471)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org .apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 81 more
Run Code Online (Sandbox Code Playgroud)

然后我从这个链接复制 spark-hadoop-cloud_2.11-2.3.1.3.0.2.0-50.jar到 spark/jars 文件夹

这解决了之前的“NoClassDefFoundError”,但产生了新的类定义错误,即:

错误 2:

java.lang.NoClassDefFoundError: 
org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:230)
at org.apache.spark.internal.io.FileCommitProtocol$.instantiate(FileCommitProtocol.scala:144)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:98)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
....
Run Code Online (Sandbox Code Playgroud)

如果需要,可以粘贴完整的堆栈跟踪

之后,我将 hadoop-mapreduce-client-core-3.1.1.jar 复制到 spark/jars 文件夹中,并再次在 spark-shell 中运行测试代码。这次我得到以下错误:

在此之后,我被卡住了。

错误 3(以及我被卡住的最终错误):

scala> sourceDF.write.format("orc").save(datasets + "orc")
java.lang.NoSuchMethodError: 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.<init>(Ljava/lang/String;Ljava/lang/String;Z)V
at org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.<init>(PathOutputCommitProtocol.scala:60)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.internal.io.FileCommitProtocol$.instantiate(FileCommitProtocol.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:98)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:435)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:471)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
... 48 elided
Run Code Online (Sandbox Code Playgroud)

这看起来像是不正确的 jar 问题,但我找不到正确的问题。这个问题与上一个问题类似,但找不到相关答案,因此再次发布。

Div*_* DD 0

我能够完成这项工作。问题出在我的 Spark 版本上。我使用的是spark 2.2.1版本,而至少需要spark 2.3.1。问题中提到的最后一个错误是指向错误的构造函数。经过一番挖掘,我发现spark-core_2.11-2.2.1.jar正在使用两个参数构造函数,而spark-hadoop-cloud_2.11-2.3.1.3.0.2.0-50.jar期望3参数构造函数仅出现在spark-core_2.11-2.3.1.jar版本中。经过版本升级并进行一些调整后,我能够对此进行测试。

运行此命令查看问题:

javap -classpath Spark-core_2.11-2.2.1.jar org/apache/spark/internal/io/HadoopMapReduceCommitProtocol

javap -classpath Spark-core_2.11-2.3.1.jar org/apache/spark/internal/io/HadoopMapReduceCommitProtocol

只是为了让您知道,我下载了没有预构建 hadoop 版本的 Spark 2.3.1,然后使用 hadoop 3.1.0 jar 配置它并能够使其工作。