如何使用apache spark处理数百万个较小的s3文件

Der*_*k_M 4 hadoop amazon-s3 amazon-web-services apache-spark

所以这个问题一直让我感到疯狂,并且开始感觉像s3的火花并不是这项特定工作的正确工具.基本上,我在s3存储桶中有数百万个较小的文件.由于我无法进入的原因,这些文件无法合并(一个是唯一的加密转录本).我见过类似的问题,每一个解决方案都没有产生好的结果.我尝试的第一件事是外卡:

sc.wholeTextFiles(s3aPath + "/*/*/*/*.txt").count();
Run Code Online (Sandbox Code Playgroud)

注意:计数更多地调试处理文件所需的时间.这项工作几乎花了一整天时间超过10个实例,但仍未通过列表底部发布的错误.然后我找到了这个链接,它基本上说这不是最佳的:https://forums.databricks.com/questions/480/how-do-i-ingest-a-large-number-of-files-from- S3-my.html

然后,我决定尝试另一种我目前无法找到的解决方案,即加载所有路径,然后联合所有rdds

    ObjectListing objectListing = s3Client.listObjects(bucket);
    List<JavaPairRDD<String, String>> rdds = new ArrayList<>();
    List<JavaPairRDD<String, String>> tempMeta = new ArrayList<>();
    //initializes objectListing
    tempMeta.addAll(objectListing.getObjectSummaries().stream()
            .map(func)
            .filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript"))
            .map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName()))
            .collect(Collectors.toList()));

    while(objectListing.isTruncated()) {
        objectListing = s3Client.listNextBatchOfObjects(objectListing);
        tempMeta.addAll(objectListing.getObjectSummaries().stream()
                .map(func)
                .filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript"))
                .map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName()))
                .collect(Collectors.toList()));
        if (tempMeta.size() > 5000) {
            rdds.addAll(tempMeta);
            tempMeta = new ArrayList<>();
        }
    }

    if (!tempMeta.isEmpty()){
        rdds.addAll(tempMeta);
    }
    return SparkConfig.getSparkContext().union(rdds.get(0), rdds.subList(1, rdds.size()));
Run Code Online (Sandbox Code Playgroud)

然后,即使我设置emrfs-site配置为:

{
    "Classification": "emrfs-site",
    "Properties": {
      "fs.s3.consistent.retryPolicyType": "fixed",
      "fs.s3.consistent.retryPeriodSeconds": "15",
      "fs.s3.consistent.retryCount": "20",
      "fs.s3.enableServerSideEncryption": "true",
      "fs.s3.consistent": "false"
    }
}
Run Code Online (Sandbox Code Playgroud)

每次我尝试运行这个工作时,我都会在6小时内收到此错误:

17/02/15 19:15:41 INFO AmazonHttpClient: Unable to execute HTTP request: randomBucket.s3.amazonaws.com:443 failed to respond
org.apache.http.NoHttpResponseException: randomBucket.s3.amazonaws.com:443 failed to respond
Run Code Online (Sandbox Code Playgroud)

首先,有没有办法使用来自s3的火花的较小文件?我不在乎解决方案是不是最理想的,我只是想尝试让事情变得有效.我考虑尝试火花流,因为它的内部与加载所有文件有点不同.然后我会使用fileStream并将newFiles设置为false.然后我可以批量处理它们.然而,这并不是火花流的建立,所以我在走这条路线时会有冲突.

作为旁注,我在hdfs中生成了数百万个小文件,并尝试了相同的工作,并在一小时内完成.这让我觉得它具体是s3.另外,我使用的是s3a,而不是普通的s3.

Ste*_*ran 11

如果您使用的是亚马逊EMR,那么您需要使用s3:// URL; s3a://是用于ASF版本的.

一个大问题是在s3中列出目录树需要多长时间,尤其是递归树遍历.spark代码假定它是一个快速的文件系统,其中列出dirs和stating文件是低成本的,而实际上每个操作需要1-4个HTTPS请求,即使在重用的HTTP/1.1连接上也会受到伤害.它可以很慢,你可以看到日志中的暂停.

真正受到伤害的地方在于它是前期分区,其中发生了很多延迟,所以这是序列化的工作正在被扼杀.

虽然作为S3a第二阶段工作的一部分,在S3a上进行的树上行走有一些加速,但是//* .txt形式的通配符扫描不会获得任何加速.我的建议是尝试展平您的目录结构,以便您从深树移动到浅层,甚至可能在同一目录中,以便可以在没有步行的情况下扫描它,每5000个条目需要1个HTTP请求.

请记住,无论如何,许多小文件都非常昂贵,包括HDFS,它们会耗尽存储空间.有一种特殊的聚合格式,HAR文件,就像tar文件,除了hadoop,hive和spark都可以在文件本身内部工作.这可能会有所帮助,尽管我没有看到任何实际的性能测试数据.