如何在Spark Streaming EC2集群应用程序中读取S3的输入

gpr*_*era 26 amazon-s3 amazon-ec2 apache-spark

我正在尝试让我的Spark Streaming应用程序从S3目录中读取他的输入,但是在使用spark-submit脚本启动它之后我一直收到此异常:

Exception in thread "main" java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
    at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:66)
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:49)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
    at org.apache.hadoop.fs.s3native.$Proxy6.initialize(Unknown Source)
    at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:216)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
    at org.apache.spark.streaming.StreamingContext.checkpoint(StreamingContext.scala:195)
    at MainClass$.main(MainClass.scala:1190)
    at MainClass.main(MainClass.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Run Code Online (Sandbox Code Playgroud)

我正在通过这个代码块设置这些变量,如http://spark.apache.org/docs/latest/ec2-scripts.html(页面底部)所示:

val ssc = new org.apache.spark.streaming.StreamingContext(
  conf,
  Seconds(60))
ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId",args(2))
ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey",args(3))
Run Code Online (Sandbox Code Playgroud)

args(2)和args(3)当然是我的AWS Access Key ID和Secrete Access Key.

为什么一直说他们没有定?

编辑:我也试过这种方式,但我得到了同样的例外:

val lines = ssc.textFileStream("s3n://"+ args(2) +":"+ args(3) + "@<mybucket>/path/")
Run Code Online (Sandbox Code Playgroud)

sam*_*est 25

奇.尝试也做一个.setsparkContext.尝试在启动应用程序之前导出env变量:

export AWS_ACCESS_KEY_ID=<your access>
export AWS_SECRET_ACCESS_KEY=<your secret>
Run Code Online (Sandbox Code Playgroud)

^^这就是我们这样做的方式.

更新:根据@tribbloid,上面打破了1.3.0,现在你必须使用hdfs-site.xml进行年龄和年龄的讨论,或者你可以这样做(这可以在spark-shell中运行):

val hadoopConf = sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey", mySecretKey)
Run Code Online (Sandbox Code Playgroud)

  • 这在Spark 1.3之后不再有效.现在,如果你想静态设置它.您必须将hdfs-site.xml添加到Spark的conf目录中.无法在命令行中设置它.我不知道这个设计的重点,但它刚刚发生 (2认同)

har*_*rel 22

以下配置适用于我,请确保您还设置"fs.s3.impl":

val conf = new SparkConf().setAppName("Simple Application").setMaster("local")      
val sc = new SparkContext(conf)
val hadoopConf=sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey",mySecretKey)
Run Code Online (Sandbox Code Playgroud)

  • 似乎在Python中,hadoopConfiguration属性不可用.任何解决方法的想法? (2认同)