pko*_*lov 10 scala amazon-s3 apache-spark
我有一个公共可用的Amazon s3资源(文本文件),并希望从spark访问它.这意味着 - 我没有任何亚马逊凭证 - 如果我只想下载它,它可以正常工作:
val bucket = "<my-bucket>"
val key = "<my-key>"
val client = new AmazonS3Client
val o = client.getObject(bucket, key)
val content = o.getObjectContent // <= can be read and used as input stream
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试从spark上下文访问相同的资源时
val conf = new SparkConf().setAppName("app").setMaster("local")
val sc = new SparkContext(conf)
val f = sc.textFile(s"s3a://$bucket/$key")
println(f.count())
Run Code Online (Sandbox Code Playgroud)
我在stacktrace中收到以下错误:
Exception in thread "main" com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2653)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
at com.example.Main$.main(Main.scala:14)
at com.example.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Run Code Online (Sandbox Code Playgroud)
我不想提供任何AWS凭证 - 我只想匿名访问资源(目前) - 如何实现这一目标?我可能需要使用像AnonymousAWSCredentialsProvider这样的东西 - 但是如何将它放在spark或hadoop中呢?
PS我的build.sbt以防万一
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.4.1",
"org.apache.hadoop" % "hadoop-aws" % "2.7.1"
)
Run Code Online (Sandbox Code Playgroud)
更新:在我做了一些调查之后 - 我看到了它不起作用的原因.
首先,S3AFileSystem使用以下凭证顺序创建AWS客户端:
AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain(
new BasicAWSCredentialsProvider(accessKey, secretKey),
new InstanceProfileCredentialsProvider(),
new AnonymousAWSCredentialsProvider()
);
Run Code Online (Sandbox Code Playgroud)
"accessKey"和"secretKey"值取自spark conf实例(键必须为"fs.s3a.access.key"和"fs.s3a.secret.key"或org.apache.hadoop.fs.s3a.Constants .ACCESS_KEY和org.apache.hadoop.fs.s3a.Constants.SECRET_KEY常量,这样更方便).
第二 - 您可能会看到AnonymousAWSCredentialsProvider是第三个选项(最后优先级) - 可能出现的问题是什么?查看AnonymousAWSCredentials的实现:
public class AnonymousAWSCredentials implements AWSCredentials {
public String getAWSAccessKeyId() {
return null;
}
public String getAWSSecretKey() {
return null;
}
}
Run Code Online (Sandbox Code Playgroud)
它只是为访问密钥和密钥返回null.听起来很合理.但请查看AWSCredentialsProviderChain:
AWSCredentials credentials = provider.getCredentials();
if (credentials.getAWSAccessKeyId() != null &&
credentials.getAWSSecretKey() != null) {
log.debug("Loading credentials from " + provider.toString());
lastUsedProvider = provider;
return credentials;
}
Run Code Online (Sandbox Code Playgroud)
如果两个键都为空,它不会选择提供程序 - 这意味着匿名凭据不起作用.看起来像aws-java-sdk-1.7.4中的一个bug.我尝试使用最新版本 - 但它与hadoop-aws-2.7.1不兼容.
还有其他想法吗?
这对我有帮助:
val session = SparkSession.builder()
.appName("App")
.master("local[*]")
.config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
.getOrCreate()
val df = session.read.csv(filesFromS3:_*)
Run Code Online (Sandbox Code Playgroud)
版本:
"org.apache.spark" %% "spark-sql" % "2.4.0",
"org.apache.hadoop" % "hadoop-aws" % "2.8.5",
Run Code Online (Sandbox Code Playgroud)
文档: https: //hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#Authentication_properties
我个人从未访问过 Spark 的公共数据。您可以尝试使用虚拟凭据,或者专门为此用途创建凭据。直接在 SparkConf 对象上设置它们。
val sparkConf: SparkConf = ???
val accessKeyId: String = ???
val secretAccessKey: String = ???
sparkConf.set("spark.hadoop.fs.s3.awsAccessKeyId", accessKeyId)
sparkConf.set("spark.hadoop.fs.s3n.awsAccessKeyId", accessKeyId)
sparkConf.set("spark.hadoop.fs.s3.awsSecretAccessKey", secretAccessKey)
sparkConf.set("spark.hadoop.fs.s3n.awsSecretAccessKey", secretAccessKey)
Run Code Online (Sandbox Code Playgroud)
作为替代方案,请阅读 的文档DefaultAWSCredentialsProviderChain以了解在何处查找凭据。该列表(顺序很重要)是:
- 环境变量 - AWS_ACCESS_KEY_ID 和 AWS_SECRET_KEY
- Java 系统属性 - aws.accessKeyId 和 aws.secretKey
- 位于所有 AWS 开发工具包和 AWS CLI 共享的默认位置 (~/.aws/credentials) 的凭证配置文件文件
- 通过 Amazon EC2 元数据服务传递的实例配置文件凭证