在 Windows 上使用 Staging S3A Committer 写入 S3 时出现 UnsatisfiedLinkError

Nem*_*ion 3 windows hadoop amazon-s3 apache-spark apache-spark-sql

我正在尝试使用 Apache Spark 将 Parquet 数据写入 AWS S3 目录。我在 Windows 10 上使用本地计算机,没有安装 Spark 和 Hadoop,而是将它们添加为 SBT 依赖项(Hadoop 3.2.1、Spark 2.4.5)。我的 SBT 如下:

\n\n
scalaVersion := "2.11.11"\n\nlibraryDependencies ++= Seq(\n  "org.apache.spark" %% "spark-sql" % "2.4.5",\n  "org.apache.spark" %% "spark-hadoop-cloud" % "2.3.2.3.1.0.6-1",\n\n  "org.apache.hadoop" % "hadoop-client" % "3.2.1",\n  "org.apache.hadoop" % "hadoop-common" % "3.2.1",\n  "org.apache.hadoop" % "hadoop-aws" % "3.2.1",\n\n  "com.amazonaws" % "aws-java-sdk-bundle" % "1.11.704"\n)\n\ndependencyOverrides ++= Seq(\n  "com.fasterxml.jackson.core" % "jackson-core" % "2.11.0",\n  "com.fasterxml.jackson.core" % "jackson-databind" % "2.11.0",\n  "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.11.0"\n)\n\nresolvers ++= Seq(\n  "apache" at "https://repo.maven.apache.org/maven2",\n  "hortonworks" at "https://repo.hortonworks.com/content/repositories/releases/",\n)\n
Run Code Online (Sandbox Code Playgroud)\n\n

我使用 S3A Staging Directory Committer,如HadoopCloudera文档中所述。我还知道 StackOverflow 上的这两个问题,并使用它们进行正确的配置:

\n\n\n\n

我已经添加了所有必需的(据我所知)配置,包括最新的两个特定于 Parquet 的配置:

\n\n
val spark = SparkSession.builder()\n      .appName("test-run-s3a-commiters")\n      .master("local[*]")\n\n      .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\n      .config("spark.hadoop.fs.s3a.endpoint", "s3.eu-central-1.amazonaws.com")\n      .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider")\n      .config("spark.hadoop.fs.s3a.connection.maximum", "100")\n\n      .config("spark.hadoop.fs.s3a.committer.name", "directory")\n      .config("spark.hadoop.fs.s3a.committer.magic.enabled", "false")\n      .config("spark.hadoop.fs.s3a.committer.staging.conflict-mode", "append")\n      .config("spark.hadoop.fs.s3a.committer.staging.unique-filenames", "true")\n      .config("spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads", "true")\n      .config("spark.hadoop.fs.s3a.buffer.dir", "tmp/")\n      .config("spark.hadoop.fs.s3a.committer.staging.tmp.path", "hdfs_tmp/")\n      .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")\n      .config("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a", "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")\n\n      .config("spark.sql.sources.commitProtocolClass", "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")\n      .config("spark.sql.parquet.output.committer.class", "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")\n      .getOrCreate()\n\nspark.sparkContext.setLogLevel("info")\n
Run Code Online (Sandbox Code Playgroud)\n\n

从日志中我可以看到StagingCommitter已实际应用(我还可以在指定路径下看到本地文件系统中的中间数据,并且在执行期间 S3 中没有_temporary目录,就像使用默认FileOutputCommitter一样)。

\n\n

然后我运行简单的代码将测试数据写入 S3 存储桶:

\n\n
import spark.implicits._\n\nval sourceDF = spark\n  .range(0, 10000)\n  .map(id => {\n    Thread.sleep(10)\n    id\n  })\n\nsourceDF\n  .write\n  .format("parquet")\n  .save("s3a://my/test/bucket/")\n
Run Code Online (Sandbox Code Playgroud)\n\n

(我用来Thread.sleep模拟一些处理,几乎没有时间检查本地临时目录和S3存储桶的中间内容)

\n\n

但是,我java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$POSIX.stat在提交任务尝试期间遇到错误。\n下面是一段日志(减少到 1 个执行程序)和错误堆栈跟踪。

\n\n
20/05/09 15:13:18 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 15000\n20/05/09 15:13:18 INFO StagingCommitter: Starting: Task committer attempt_20200509151301_0000_m_000000_0: needsTaskCommit() Task attempt_20200509151301_0000_m_000000_0\n20/05/09 15:13:18 INFO StagingCommitter: Task committer attempt_20200509151301_0000_m_000000_0: needsTaskCommit() Task attempt_20200509151301_0000_m_000000_0: duration 0:00.005s\n20/05/09 15:13:18 INFO StagingCommitter: Starting: Task committer attempt_20200509151301_0000_m_000000_0: commit task attempt_20200509151301_0000_m_000000_0\n20/05/09 15:13:18 INFO StagingCommitter: Task committer attempt_20200509151301_0000_m_000000_0: commit task attempt_20200509151301_0000_m_000000_0: duration 0:00.019s\n20/05/09 15:13:18 ERROR Utils: Aborting task\njava.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$POSIX.stat(Ljava/lang/String;)Lorg/apache/hadoop/io/nativeio/NativeIO$POSIX$Stat;\n    at org.apache.hadoop.io.nativeio.NativeIO$POSIX.stat(Native Method)\n    at org.apache.hadoop.io.nativeio.NativeIO$POSIX.getStat(NativeIO.java:460)\n    at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfoByNativeIO(RawLocalFileSystem.java:821)\n    at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:735)\n    at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getPermission(RawLocalFileSystem.java:703)\n    at org.apache.hadoop.fs.LocatedFileStatus.<init>(LocatedFileStatus.java:52)\n    at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:2091)\n    at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:2071)\n    at org.apache.hadoop.fs.FileSystem$5.hasNext(FileSystem.java:2190)\n    at org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles(S3AUtils.java:1295)\n    at org.apache.hadoop.fs.s3a.S3AUtils.flatmapLocatedFiles(S3AUtils.java:1333)\n    at org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter(S3AUtils.java:1350)\n    at org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter.getTaskOutput(StagingCommitter.java:385)\n    at org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter.commitTask(StagingCommitter.java:641)\n    at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)\n    at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)\n    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:225)\n    at org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.commitTask(PathOutputCommitProtocol.scala:220)\n    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:78)\n    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)\n    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)\n    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)\n    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)\n    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)\n    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)\n    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)\n    at org.apache.spark.scheduler.Task.run(Task.scala:123)\n    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)\n    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)\n    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)\n    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n    at java.lang.Thread.run(Thread.java:748)\n20/05/09 15:13:18 ERROR Utils: Aborting task\n
Run Code Online (Sandbox Code Playgroud)\n\n

根据我目前的理解,配置是正确的。该错误可能是由某些版本不兼容或我的本地环境设置引起的。

\n\n

提供的代码对于 ORC 和 CSV 可以按预期工作,没有任何错误,但对于 Parquet 则不然。

\n\n

请建议可能导致错误的原因以及如何解决此问题?

\n

Nem*_*ion 8

对于来到这里的每个人,我都找到了解决方案。正如预期的那样,该问题与 S3A 输出提交者或库依赖项无关。

由于 SBT 依赖项中的 Hadoop 版本与我的 Windows 计算机上的 winutils.exe(HDFS 包装器)之间的版本不兼容,引发了 Java 本机方法的 UnsatisfiedLinkError 异常。

我已经从cdarlint/winutils下载了相应的版本,并且一切正常。哈哈