并行读取S3中的多个文件(Spark,Java)

Nir*_*ira 10 java amazon-s3 apache-spark

我看到了一些关于此问题的讨论,但无法理解正确的解决方案:我想将S3中的几百个文件加载到RDD中.我现在就是这样做的:

ObjectListing objectListing = s3.listObjects(new ListObjectsRequest().
                withBucketName(...).
                withPrefix(...));
List<String> keys = new LinkedList<>();
objectListing.getObjectSummaries().forEach(summery -> keys.add(summery.getKey())); // repeat while objectListing.isTruncated()

JavaRDD<String> events = sc.parallelize(keys).flatMap(new ReadFromS3Function(clusterProps));
Run Code Online (Sandbox Code Playgroud)

ReadFromS3Function不使用实际的阅读AmazonS3客户端:

    public Iterator<String> call(String s) throws Exception {
        AmazonS3 s3Client = getAmazonS3Client(properties);
        S3Object object = s3Client.getObject(new GetObjectRequest(...));
        InputStream is = object.getObjectContent();
        List<String> lines = new LinkedList<>();
        String str;
        try {
            BufferedReader reader = new BufferedReader(new InputStreamReader(is));
            if (is != null) {
                while ((str = reader.readLine()) != null) {
                    lines.add(str);
                }
            } else {
                ...
            }
        } finally {
            ...
        }
        return lines.iterator();
Run Code Online (Sandbox Code Playgroud)

我从Scala中为同一个问题看到的答案中"有点"翻译了这个.我认为也可以传递整个路径列表sc.textFile(...),但我不确定哪种是最佳实践方式.

Ste*_*ran 7

潜在的问题是在s3中列出对象的速度非常慢,并且当某些事情进行了树木行走时(如路径的通配符模式加工),它看起来像目录树的方式会导致性能下降.

帖子中的代码正在进行全子列表,这提供了更好的性能,它实际上是Hadoop 2.8和s3a listFiles(路径,递归)附带的HADOOP-13208.

获得该列表后,您已经获得了对象路径的字符串,然后您可以将其映射到s3a/s3n路径,以便将spark作为文本文件输入处理,然后您可以将其应用于

val files = keys.map(key -> s"s3a://$bucket/$key").mkString(",")
sc.textFile(files).map(...)
Run Code Online (Sandbox Code Playgroud)

根据要求,这是使用的java代码.

String prefix = "s3a://" + properties.get("s3.source.bucket") + "/";
objectListing.getObjectSummaries().forEach(summary -> keys.add(prefix+summary.getKey())); 
// repeat while objectListing truncated 
JavaRDD<String> events = sc.textFile(String.join(",", keys))
Run Code Online (Sandbox Code Playgroud)

请注意,我将s3n切换到s3a,因为如果CP上有hadoop-awsamazon-sdkJAR,则s3a连接器是您应该使用的连接器.这是更好的,它是由人(我)维护和测试火花工作负载的那个.请参阅Hadoop S3连接器的历史.

  • 尝试s3a,对我来说很好.需要`--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2`作为spark-submit的参数,我想也是`sc.hadoopConfiguration ().set("fs.s3.impl","org.apache.hadoop.fs.s3a.S3AFileSystem");`代码中的`.并且EMR应该具有IAM角色,具有从/向桶读取/写入的权限.这样你就可以做到'AmazonS3 s3 =新的AmazonS3Client();`并且会自动获取信用卡. (2认同)