bre*_*ttc 5 hadoop amazon-s3 apache-spark
我已经在 AWS EC2 实例上安装了 Spark 2.4.3 和 Hadoop 3.2。我一直在本地模式下使用 spark(主要是 pyspark)并取得了巨大的成功。能够旋转一些小的东西,然后在我需要动力时调整它的大小,并且非常快速地完成所有操作,这真是太好了。当我真的需要扩展时,我可以切换到 EMR 并去吃午饭。除了一个问题外,一切都很顺利:我无法让本地火花可靠地写入 S3(我一直在使用本地 EBS 空间)。这显然与有关 S3 作为文件系统的限制的文档中概述的所有问题有关。但是,使用最新的 hadoop,我对文档的阅读应该能够让它工作。
请注意,我知道另一个帖子,它提出了一个相关问题;这里有一些指导,但我看不到任何解决方案。如何使用新的 Hadoop parquet magic commiter 通过 Spark 自定义 S3 服务器
根据我对此处文档的最佳理解,我有以下设置(在不同地方设置):https : //hadoop.apache.org/docs/r3.2.1/hadoop-aws/tools/hadoop-aws/index.html
fs.s3.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
fs.s3a.committer.name: directory
fs.s3a.committer.magic.enabled: false
fs.s3a.committer.threads: 8
fs.s3a.committer.staging.tmp.path: /cache/staging
fs.s3a.committer.staging.unique-filenames: true
fs.s3a.committer.staging.conflict-mode: fail
fs.s3a.committer.staging.abort.pending.uploads: true
mapreduce.outputcommitter.factory.scheme.s3a: org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
fs.s3a.connection.maximum: 200
fs.s3a.fast.upload: true
Run Code Online (Sandbox Code Playgroud)
一个相关的观点是我正在使用镶木地板进行储蓄。我看到之前保存 Parquet 存在一些问题,但我在最新的文档中没有看到这一点。也许这就是问题所在?
无论如何,这是我得到的错误,这似乎表明 S3 在尝试重命名临时文件夹时出现的错误类型。是否有一些正确的设置可以使这种情况消失?
java.io.IOException: Failed to rename S3AFileStatus{path=s3://my-research-lab-recognise/spark-testing/v2/nz/raw/bank/_temporary/0/_temporary/attempt_20190910022011_0004_m_000118_248/part-00118-c8f8259f-a727-4e19-8ee2-d6962020c819-c000.snappy.parquet; isDirectory=false; length=185052; replication=1; blocksize=33554432; modification_time=1568082036000; access_time=0; owner=brett; group=brett; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false} isEmptyDirectory=FALSE to s3://my-research-lab-recognise/spark-testing/v2/nz/raw/bank/part-00118-c8f8259f-a727-4e19-8ee2-d6962020c819-c000.snappy.parquet
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:473)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:486)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:597)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:560)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:225)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:78)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
... 10 more
Run Code Online (Sandbox Code Playgroud)
小智 6
我帮助@brettc 进行了配置,我们找到了要设置的正确配置。
在$SPARK_HOME/conf/spark-defaults.conf 下
# Enable S3 file system to be recognise
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
# Parameters to use new commiters
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.hadoop.fs.s3a.committer.name directory
spark.hadoop.fs.s3a.committer.magic.enabled false
spark.hadoop.fs.s3a.commiter.staging.conflict-mode replace
spark.hadoop.fs.s3a.committer.staging.unique-filenames true
spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads true
spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
Run Code Online (Sandbox Code Playgroud)
如果您查看上面的最后 2 行配置,您会发现您需要包含PathOutputCommitProtocol和BindingParquetOutputCommitter类的org.apache.spark.internal.io 库。为此,您必须在此处下载spark-hadoop-cloud jar (在我们的示例中,我们采用了 2.3.2.3.1.0.6-1 版本)并将其放在$SPARK_HOME/jars/ 下。
您可以通过创建 parquet 文件轻松验证您正在使用新的提交者。_SUCCESS 文件应包含如下所示的 json:
{
"name" : "org.apache.hadoop.fs.s3a.commit.files.SuccessData/1",
"timestamp" : 1574729145842,
"date" : "Tue Nov 26 00:45:45 UTC 2019",
"hostname" : "<hostname>",
"committer" : "directory",
"description" : "Task committer attempt_20191125234709_0000_m_000000_0",
"metrics" : { [...] },
"diagnostics" : { [...] },
"filenames" : [...]
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1376 次 |
| 最近记录: |