如何让 AWS 上的本地 Spark 写入 S3

bre*_*ttc 5 hadoop amazon-s3 apache-spark

我已经在 AWS EC2 实例上安装了 Spark 2.4.3 和 Hadoop 3.2。我一直在本地模式下使用 spark(主要是 pyspark)并取得了巨大的成功。能够旋转一些小的东西,然后在我需要动力时调整它的大小,并且非常快速地完成所有操作,这真是太好了。当我真的需要扩展时,我可以切换到 EMR 并去吃午饭。除了一个问题外,一切都很顺利:我无法让本地火花可靠地写入 S3(我一直在使用本地 EBS 空间)。这显然与有关 S3 作为文件系统的限制的文档中概述的所有问题有关。但是,使用最新的 hadoop,我对文档的阅读应该能够让它工作。

请注意,我知道另一个帖子,它提出了一个相关问题;这里有一些指导,但我看不到任何解决方案。如何使用新的 Hadoop parquet magic commiter 通过 Spark 自定义 S3 服务器

根据我对此处文档的最佳理解,我有以下设置(在不同地方设置):https : //hadoop.apache.org/docs/r3.2.1/hadoop-aws/tools/hadoop-aws/index.html

fs.s3.impl: org.apache.hadoop.fs.s3a.S3AFileSystem  
fs.s3a.committer.name: directory   
fs.s3a.committer.magic.enabled: false  
fs.s3a.committer.threads: 8 
fs.s3a.committer.staging.tmp.path: /cache/staging  
fs.s3a.committer.staging.unique-filenames: true  
fs.s3a.committer.staging.conflict-mode: fail  
fs.s3a.committer.staging.abort.pending.uploads: true  
mapreduce.outputcommitter.factory.scheme.s3a: org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory  
fs.s3a.connection.maximum: 200  
fs.s3a.fast.upload: true  
Run Code Online (Sandbox Code Playgroud)

一个相关的观点是我正在使用镶木地板进行储蓄。我看到之前保存 Parquet 存在一些问题,但我在最新的文档中没有看到这一点。也许这就是问题所在?

无论如何,这是我得到的错误,这似乎表明 S3 在尝试重命名临时文件夹时出现的错误类型。是否有一些正确的设置可以使这种情况消失?

java.io.IOException: Failed to rename S3AFileStatus{path=s3://my-research-lab-recognise/spark-testing/v2/nz/raw/bank/_temporary/0/_temporary/attempt_20190910022011_0004_m_000118_248/part-00118-c8f8259f-a727-4e19-8ee2-d6962020c819-c000.snappy.parquet; isDirectory=false; length=185052; replication=1; blocksize=33554432; modification_time=1568082036000; access_time=0; owner=brett; group=brett; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false} isEmptyDirectory=FALSE to s3://my-research-lab-recognise/spark-testing/v2/nz/raw/bank/part-00118-c8f8259f-a727-4e19-8ee2-d6962020c819-c000.snappy.parquet
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:473)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:486)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:597)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:560)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:225)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:78)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 10 more
Run Code Online (Sandbox Code Playgroud)

小智 6

我帮助@brettc 进行了配置,我们找到了要设置的正确配置。

$SPARK_HOME/conf/spark-defaults.conf 下

# Enable S3 file system to be recognise
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem

# Parameters to use new commiters
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.hadoop.fs.s3a.committer.name directory
spark.hadoop.fs.s3a.committer.magic.enabled false
spark.hadoop.fs.s3a.commiter.staging.conflict-mode replace
spark.hadoop.fs.s3a.committer.staging.unique-filenames true
spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads true
spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class     org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
Run Code Online (Sandbox Code Playgroud)

如果您查看上面的最后 2 行配置,您会发现您需要包含PathOutputCommitProtocolBindingParquetOutputCommitter类的org.apache.spark.internal.io 库。为此,您必须在此处下载spark-hadoop-cloud jar (在我们的示例中,我们采用了 2.3.2.3.1.0.6-1 版本)并将其放在$SPARK_HOME/jars/ 下

您可以通过创建 parquet 文件轻松验证您正在使用新的提交者。_SUCCESS 文件应包含如下所示的 json:

{
  "name" : "org.apache.hadoop.fs.s3a.commit.files.SuccessData/1",
  "timestamp" : 1574729145842,
  "date" : "Tue Nov 26 00:45:45 UTC 2019",
  "hostname" : "<hostname>",
  "committer" : "directory",
  "description" : "Task committer attempt_20191125234709_0000_m_000000_0",
  "metrics" : { [...] },
  "diagnostics" : { [...] },
  "filenames" : [...]
}
Run Code Online (Sandbox Code Playgroud)