使用sc.textFile("s3n:// ...)从S3读取Spark文件

Pol*_*ase 45 hortonworks-data-platform apache-spark rdd

尝试使用spark-shell读取位于S3中的文件:

scala> val myRdd = sc.textFile("s3n://myBucket/myFile1.log")
lyrics: org.apache.spark.rdd.RDD[String] = s3n://myBucket/myFile1.log MappedRDD[55] at textFile at <console>:12

scala> myRdd.count
java.io.IOException: No FileSystem for scheme: s3n
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2607)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2614)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    ... etc ...
Run Code Online (Sandbox Code Playgroud)

IOException异常:没有文件系统的方案:S3N与发生错误:

  • 开发机器上的Spark 1.31或1.40(没有Hadoop库)
  • Hortonworks Sandbox HDP v2.2.4(Hadoop 2.60)运行,它集成了Spark 1.2.1开箱即用
  • 使用s3://或s3n:// scheme

这个错误的原因是什么?缺少依赖,缺少配置或误用sc.textFile()

或者可能是因为这个帖子似乎暗示了影响Hadoop 2.60特有的Spark构建的错误.我将尝试Spark for Hadoop 2.40,看看这是否解决了这个问题.

Pol*_*ase 43

确认这与针对Hadoop 2.60的Spark构建有关.刚刚安装了Spark 1.4.0"Pre for Hadoop 2.4及更高版本"(而不是Hadoop 2.6).现在代码工作正常.

sc.textFile("s3n://bucketname/Filename") 现在引发另一个错误:

java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
Run Code Online (Sandbox Code Playgroud)

下面的代码使用S3 URL格式来显示Spark可以读取S3文件.使用开发机器(没有Hadoop库).

scala> val lyrics = sc.textFile("s3n://MyAccessKeyID:MySecretKey@zpub01/SafeAndSound_Lyrics.txt")
lyrics: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21

scala> lyrics.count
res1: Long = 9
Run Code Online (Sandbox Code Playgroud)

甚至更好:如果AWS Secret Key具有前向"/",则上面使用S3N URI内联AWS凭证的代码将会中断.在SparkContext中配置AWS凭证将修复它.无论S3文件是公共还是私有,代码都有效.

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "BLABLA")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "....") // can contain "/"
val myRDD = sc.textFile("s3n://myBucket/MyFilePattern")
myRDD.count
Run Code Online (Sandbox Code Playgroud)


Ser*_*sev 33

尽管这个问题已经得到了一个公认的答案,但我认为为什么会发生这种情况的具体细节仍然缺失.所以我认为可能还有一个地方可以再提一个答案.

如果添加所需的hadoop-aws依赖项,则代码应该可以正常工作.

启动Hadoop 2.6.0后,s3 FS连接器已移至名为hadoop-aws的独立库中.还有一个Jira: 将与s3相关的FS连接器代码移动到hadoop-aws.

这意味着针对Hadoop 2.6.0或更新版本构建的任何版本的spark都必须使用另一个外部依赖项才能连接到S3文件系统.
这是一个我尝试过的sbt示例,并且使用针对Hadoop 2.6.0构建的Apache Spark 1.6.2正在按预期工作:

libraryDependencies + ="org.apache.hadoop"%"hadoop-aws"%"2.6.0"

在我的情况下,我遇到了一些依赖项问题,所以我通过添加排除来解决:

libraryDependencies + ="org.apache.hadoop"%"hadoop-aws"%"2.6.0"exclude("tomcat","jasper-compiler")excludeAll ExclusionRule(organization ="javax.servlet")

在其他相关说明中,我还没有尝试过,但是建议使用"s3a"而不是"s3n"文件系统启动Hadoop 2.6.0.

第三代,s3a:文件系统.该文件系统绑定旨在成为替代s3n:的交换机,支持更大的文件并承诺更高的性能.


And*_*w K 16

您可以将--packages参数与相应的jar:添加到您的提交中:

bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0 code.py
Run Code Online (Sandbox Code Playgroud)


kbt*_*kbt 8

这是一个示例火花代码,可以读取s3上的文件

val hadoopConf = sparkContext.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", s3Key)
hadoopConf.set("fs.s3.awsSecretAccessKey", s3Secret)
var jobInput = sparkContext.textFile("s3://" + s3_location)
Run Code Online (Sandbox Code Playgroud)


小智 7

我不得不将hadoop下载中的jar文件复制到$SPARK_HOME/jars目录中。使用该--jars标志或该--packages标志进行火花提交不起作用。

细节:

  • 星火2.3.0
  • Hadoop下载为2.7.6
  • 复制的两个jar文件来自 (hadoop dir)/share/hadoop/tools/lib/
    • aws-java-sdk-1.7.4.jar
    • hadoop-aws-2.7.6.jar


小智 6

在Spark 2.0.2中遇到同样的问题.通过喂它罐子解决它.这是我跑的:

$ spark-shell --jars aws-java-sdk-1.7.4.jar,hadoop-aws-2.7.3.jar,jackson-annotations-2.7.0.jar,jackson-core-2.7.0.jar,jackson-databind-2.7.0.jar,joda-time-2.9.6.jar

scala> val hadoopConf = sc.hadoopConfiguration
scala> hadoopConf.set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
scala> hadoopConf.set("fs.s3.awsAccessKeyId",awsAccessKeyId)
scala> hadoopConf.set("fs.s3.awsSecretAccessKey", awsSecretAccessKey)
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
scala> sqlContext.read.parquet("s3://your-s3-bucket/")
Run Code Online (Sandbox Code Playgroud)

很明显,你需要在你运行spark-shell的路径中放置jar


Ste*_*ran 5

Spark JIRA,SPARK-7481,于今天(2016 年 10 月 20 日)开放,用于添加 Spark-cloud 模块,其中包括对所有 s3a 和 azure wasb 的传递依赖:需要以及测试。

还有与之匹配的Spark PR 。这就是我如何在我的 Spark 构建中获得 s3a 支持

如果您手动执行此操作,则必须获得与其余 hadoop JAR 具有的确切版本的 hadoop-aws JAR,以及与 Hadoop aws 编译所针对的版本 100% 同步的 AWS JAR 版本。对于 Hadoop 2.7。{1,2,3,...}

hadoop-aws-2.7.x.jar 
aws-java-sdk-1.7.4.jar
joda-time-2.9.3.jar
+ jackson-*-2.6.5.jar
Run Code Online (Sandbox Code Playgroud)

将所有这些粘贴到SPARK_HOME/jars. 使用环境变量或中设置的凭据运行 Sparkspark-default.conf

最简单的测试是你能计算 CSV 文件的行数吗

val landsatCSV = "s3a://landsat-pds/scene_list.gz"
val lines = sc.textFile(landsatCSV)
val lineCount = lines.count()
Run Code Online (Sandbox Code Playgroud)

拿到号码:一切都好。获取堆栈跟踪。坏消息。