Der*_*k_M 4 hadoop amazon-s3 amazon-web-services apache-spark
所以这个问题一直让我感到疯狂,并且开始感觉像s3的火花并不是这项特定工作的正确工具.基本上,我在s3存储桶中有数百万个较小的文件.由于我无法进入的原因,这些文件无法合并(一个是唯一的加密转录本).我见过类似的问题,每一个解决方案都没有产生好的结果.我尝试的第一件事是外卡:
sc.wholeTextFiles(s3aPath + "/*/*/*/*.txt").count();
Run Code Online (Sandbox Code Playgroud)
注意:计数更多地调试处理文件所需的时间.这项工作几乎花了一整天时间超过10个实例,但仍未通过列表底部发布的错误.然后我找到了这个链接,它基本上说这不是最佳的:https://forums.databricks.com/questions/480/how-do-i-ingest-a-large-number-of-files-from- S3-my.html
然后,我决定尝试另一种我目前无法找到的解决方案,即加载所有路径,然后联合所有rdds
ObjectListing objectListing = s3Client.listObjects(bucket);
List<JavaPairRDD<String, String>> rdds = new ArrayList<>();
List<JavaPairRDD<String, String>> tempMeta = new ArrayList<>();
//initializes objectListing
tempMeta.addAll(objectListing.getObjectSummaries().stream()
.map(func)
.filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript"))
.map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName()))
.collect(Collectors.toList()));
while(objectListing.isTruncated()) {
objectListing = s3Client.listNextBatchOfObjects(objectListing);
tempMeta.addAll(objectListing.getObjectSummaries().stream()
.map(func)
.filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript"))
.map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName()))
.collect(Collectors.toList()));
if (tempMeta.size() > 5000) {
rdds.addAll(tempMeta);
tempMeta = new ArrayList<>();
}
}
if (!tempMeta.isEmpty()){
rdds.addAll(tempMeta);
}
return SparkConfig.getSparkContext().union(rdds.get(0), rdds.subList(1, rdds.size()));
Run Code Online (Sandbox Code Playgroud)
然后,即使我设置emrfs-site配置为:
{
"Classification": "emrfs-site",
"Properties": {
"fs.s3.consistent.retryPolicyType": "fixed",
"fs.s3.consistent.retryPeriodSeconds": "15",
"fs.s3.consistent.retryCount": "20",
"fs.s3.enableServerSideEncryption": "true",
"fs.s3.consistent": "false"
}
}
Run Code Online (Sandbox Code Playgroud)
每次我尝试运行这个工作时,我都会在6小时内收到此错误:
17/02/15 19:15:41 INFO AmazonHttpClient: Unable to execute HTTP request: randomBucket.s3.amazonaws.com:443 failed to respond
org.apache.http.NoHttpResponseException: randomBucket.s3.amazonaws.com:443 failed to respond
Run Code Online (Sandbox Code Playgroud)
首先,有没有办法使用来自s3的火花的较小文件?我不在乎解决方案是不是最理想的,我只是想尝试让事情变得有效.我考虑尝试火花流,因为它的内部与加载所有文件有点不同.然后我会使用fileStream并将newFiles设置为false.然后我可以批量处理它们.然而,这并不是火花流的建立,所以我在走这条路线时会有冲突.
作为旁注,我在hdfs中生成了数百万个小文件,并尝试了相同的工作,并在一小时内完成.这让我觉得它具体是s3.另外,我使用的是s3a,而不是普通的s3.
Ste*_*ran 11
如果您使用的是亚马逊EMR,那么您需要使用s3:// URL; s3a://是用于ASF版本的.
一个大问题是在s3中列出目录树需要多长时间,尤其是递归树遍历.spark代码假定它是一个快速的文件系统,其中列出dirs和stating文件是低成本的,而实际上每个操作需要1-4个HTTPS请求,即使在重用的HTTP/1.1连接上也会受到伤害.它可以很慢,你可以看到日志中的暂停.
真正受到伤害的地方在于它是前期分区,其中发生了很多延迟,所以这是序列化的工作正在被扼杀.
虽然作为S3a第二阶段工作的一部分,在S3a上进行的树上行走有一些加速,但是//* .txt形式的通配符扫描不会获得任何加速.我的建议是尝试展平您的目录结构,以便您从深树移动到浅层,甚至可能在同一目录中,以便可以在没有步行的情况下扫描它,每5000个条目需要1个HTTP请求.
请记住,无论如何,许多小文件都非常昂贵,包括HDFS,它们会耗尽存储空间.有一种特殊的聚合格式,HAR文件,就像tar文件,除了hadoop,hive和spark都可以在文件本身内部工作.这可能会有所帮助,尽管我没有看到任何实际的性能测试数据.
| 归档时间: |
|
| 查看次数: |
2328 次 |
| 最近记录: |