IllegalArgumentException,从 s3 而不是 hdfs 指定输入/输出时出现错误的 FS

osk*_*osk 1 filesystems amazon-s3 amazon-web-services hdfs

我一直在本地集群上运行我的 Spark 作业,该集群具有从中读取输入和写入输出的 hdfs。现在我已经设置了一个 AWS EMR 和一个 S3 存储桶,我可以在其中输入我的输入,并且我希望我的输出也写入 S3。

错误:

用户类抛出异常:java.lang.IllegalArgumentException:错误 FS:s3://something/input,预期:hdfs://ip-some-numbers.eu-west-1.compute.internal:8020

我尝试搜索相同的问题,并且有几个关于此问题的问题。有人建议它仅用于输出,但即使我禁用输出,也会出现相同的错误。

另一个建议是有问题 FileSystem我的代码有问题。以下是我的程序中出现的所有输入/输出:

第一次发生在我的 customFileInputFormatgetSplits(JobContext job),我实际上并没有修改自己,但我可以:

FileSystem fs = path.getFileSystem(job.getConfiguration());
Run Code Online (Sandbox Code Playgroud)

类似的情况在我的自定义中RecordReader,我自己也没有修改过:

final FileSystem fs = file.getFileSystem(job);
Run Code Online (Sandbox Code Playgroud)

nextKeyValue()RecordReader自己写的习惯中,我使用:

FileSystem fs = FileSystem.get(jc);
Run Code Online (Sandbox Code Playgroud)

最后,当我想检测我使用的文件夹中的文件数时:

val fs = FileSystem.get(sc.hadoopConfiguration)
val status = fs.listStatus(new Path(path))
Run Code Online (Sandbox Code Playgroud)

我认为问题出在我的代码上,但是如何修改FileSystem调用以支持来自 S3 的输入/输出?

mah*_*hdi 5

这是我在 EMR 上启动 spark-job 时为解决此问题所做的工作:

 val hdfs = FileSystem.get(new java.net.URI(s"s3a://${s3_bucket}"), sparkSession.sparkContext.hadoopConfiguration)
Run Code Online (Sandbox Code Playgroud)

确保将 s3_bucket 替换为您的存储桶名称

我希望这会对某人有所帮助