小编SUD*_*HAN的帖子

如何在EMR上调整spark作业以在S3上快速写入大量数据

我有一个火花工作,我在两个数据帧之间进行外连接.第一个数据帧的大小为260 GB,文件格式为文本文件,分为2200个文件,第二个数据帧的大小为2GB.然后将大约260 GB的数据帧输出写入S3需要很长时间超过2小时后我取消因为我已经在EMR上进行了大量更改.

这是我的群集信息.

emr-5.9.0
Master:    m3.2xlarge
Core:      r4.16xlarge   10 machines (each machine has 64 vCore, 488 GiB memory,EBS Storage:100 GiB)
Run Code Online (Sandbox Code Playgroud)

这是我正在设置的群集配置

capacity-scheduler  yarn.scheduler.capacity.resource-calculator :org.apache.hadoop.yarn.util.resource.DominantResourceCalculator
emrfs-site  fs.s3.maxConnections:   200
spark   maximizeResourceAllocation: true
spark-defaults  spark.dynamicAllocation.enabled:    true
Run Code Online (Sandbox Code Playgroud)

我尝试手动设置内存组件,如下所示,性能更好,但同样的事情又花了很长时间

--num-executors 60 - conf spark.yarn.executor.memoryOverhead = 9216 --executor-memory 72G --conf spark.yarn.driver.memoryOverhead = 3072 --driver-memory 26G --execeror-cores 10 - driver-cores 3 --conf spark.default.parallelism = 1200

我没有使用默认分区将数据保存到S3.

添加有关作业和查询计划的所有详细信息,以便于理解.

真正的原因是分区.这大部分时间都在占用.因为我有2K文件,所以如果我使用像200这样的重新分区,输出文件以十万分之一形式出现,然后在spark中重新加载并不是一个好故事.

在下图中我不知道为什么在项目之后再次调用sort 在此输入图像描述

在下面Image GC对我来说太高了..请oi必须处理这个请建议如何? 执行者和GC细节

下面是节点健康状态.这一点数据被保存到S3中,难怪为什么我只能看到两个节点处于活动状态并且都处于空闲状态. 这是我的节点详细信息.此时数据将保存到S3中

这是加载时的集群细节.在这一点上,我可以看到集群已被充分利用,但在将数据保存到S3时,许多节点都是免费的. 充分利用的clsuter

最后这里是我的代码,我执行Join然后保存到S3 ...

import org.apache.spark.sql.expressions._

          val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId").orderBy(unix_timestamp($"TimeStamp", …
Run Code Online (Sandbox Code Playgroud)

amazon-emr hadoop2 apache-spark-sql spark-dataframe

17
推荐指数
1
解决办法
3837
查看次数

Java : Out Of Memory Error when my application runs for longer time

I have a java application where i take files very small file (1KB) but large number of small file like in a minute i.e i am getting 20000 files in a minute. I am taking file and uploading into S3 .

I am running this in 10 parallel threads . Also i have to continuously run this application .

When this application runs for some days i get Out of memory error.

This is the exact error i get

# …
Run Code Online (Sandbox Code Playgroud)

java multithreading amazon-s3

12
推荐指数
2
解决办法
924
查看次数

AWS S3 的小文件问题

我有一个应用程序,其中

每天会生成 100 万到 1 亿个甚至更多的小 xml 文件,我必须将其加载到 S3 存储桶之一中,就像仅加载单个 xml 文件一样。该 S3 存储桶链接到 CloudFront,以便我们世界各地的客户可以更快地访问 xml 文件。

除了成本部分之外,一切对我来说都很好。随着文件数量的增加,S3 put 请求的成本每天都在增加。文件应在出现后立即推送,并且应可从 cloudFront 访问。

有什么解决方案可以节省我的成本吗?xml 文件的大小最大为 2 kb。

让我在这里详细阐述我迄今为止尝试过的一些要点。我想在本地合并所有小 xml,然后将其推送到 S3,但问题是我们应该在 AWS 中进行一些计算,以将其再次提取到小文件中,因为最终用户只接受单独的 xml 文件。提取并创建小文件并再次保存到 S3 中的成本会更高。

因此,请随意建议一些可能适合此用例的其他系统,而不是 S3。我也尝试过 HBASE,但在 AWS 中运行和管理 HBASE 是一个问题。我们还尝试了 Dynamo DB,但成本也更高。

amazon-s3 amazon-web-services

6
推荐指数
1
解决办法
8259
查看次数

如何在hadoop MapReduce中从ZipOutputStream创建zip文件

我的MapReduce必须从HBase读取记录并需要写入zip文件.我们的客户特别询问减速器输出文件应该只是.zip文件.

为此,我编写了ZipFileOutputFormat包装来压缩记录并写入zip文件.

此外,我们不能使用缓冲区并将所有行保留到缓冲区然后迭代,因为某些文件包含19GB的记录,那时它将抛出一个java.lang.OutOfMemoryError.

一切似乎都好,但有一个问题:

.zip是越来越为每个键创建的文件.在我的输出文件中,我可以看到许多输出文件,这些是每行键分隔文件.我不知道如何将它组合在zip文件中.

这是我的实施 ZipFileOutputFormat.java

public class ZipFileOutputFormat<K, V> extends FileOutputFormat<K, V> {

    public static class ZipRecordWriter<K, V> extends org.apache.hadoop.mapreduce.RecordWriter<K, V> {

        private ZipOutputStream zipOut;

        public ZipRecordWriter(FSDataOutputStream fileOut) {
            zipOut = new ZipOutputStream(fileOut);
        }
        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            zipOut.closeEntry();
            zipOut.finish();
            zipOut.close();
            zipOut.flush();
        }
        @Override
        public void write(K key, V value) throws IOException {
            String fname = null;
            if (key instanceof …
Run Code Online (Sandbox Code Playgroud)

java zip mapreduce zipoutputstream hadoop2

5
推荐指数
0
解决办法
855
查看次数

Java程序如何列出所有分区并在Linux上获得它们的可用空间?

我想要使​​用Java程序的Linux系统的分区名称及其总空间,已用空间和可用空间。

对于Windows系统,我得到的是正确的值,但是在Linux中,我仅得到一个驱动器信息:

到目前为止,这是我尝试过的。

public class DiskSpace {

public static void main(String[] args) {
    FileSystemView fsv = FileSystemView.getFileSystemView();
    File[] drives = File.listRoots();
    if (drives != null && drives.length > 0) {
        for (File aDrive : drives) {
            System.out.println("Drive Letter: " + aDrive);
            System.out.println("\tType: " + fsv.getSystemTypeDescription(aDrive));
            System.out.println("\tTotal space: " + aDrive.getTotalSpace());
            System.out.println("\tFree space: " + aDrive.getFreeSpace());
            System.out.println();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

java linux file

2
推荐指数
1
解决办法
5016
查看次数

Union只能在具有兼容列类型Spark数据帧的表上执行

这是我的联盟代码:

val dfToSave=dfMainOutput.union(insertdf.select(dfMainOutput).withColumn("FFAction", when($"FFAction" === "O" || $"FFAction" === "I", lit("I|!|")))
Run Code Online (Sandbox Code Playgroud)

当我结合时,我得到以下错误:

org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. string <> boolean at the 11th column of the second table;;
'Union
Run Code Online (Sandbox Code Playgroud)

这是两个数据帧的模式:

insertdf.printSchema()
root
 |-- OrganizationID: long (nullable = true)
 |-- SourceID: integer (nullable = true)
 |-- AuditorID: integer (nullable = true)
 |-- AuditorOpinionCode: string (nullable = true)
 |-- AuditorOpinionOnInternalControlCode: string (nullable = true)
 |-- AuditorOpinionOnGoingConcernCode: string (nullable = true)
 |-- IsPlayingAuditorRole: boolean (nullable …
Run Code Online (Sandbox Code Playgroud)

union scala dataframe apache-spark apache-spark-sql

2
推荐指数
1
解决办法
5093
查看次数

如何在Spark Scala中重命名S3文件而不是HDFS

我在S3中存储了大约一百万个文本文件。我想根据文件夹名称重命名所有文件。

我如何在Spark-Scala中做到这一点?

我正在寻找一些示例代码。

我正在使用齐柏林飞艇来运行我的spark脚本。

下面的代码我已经尝试从答案中建议

import org.apache.hadoop.fs._

val src = new Path("s3://trfsmallfffile/FinancialLineItem/MAIN")
val dest = new Path("s3://trfsmallfffile/FinancialLineItem/MAIN/dest")
val conf = sc.hadoopConfiguration   // assuming sc = spark context
val fs = Path.getFileSystem(conf)
fs.rename(src, dest)
Run Code Online (Sandbox Code Playgroud)

但是低于错误

<console>:110: error: value getFileSystem is not a member of object org.apache.hadoop.fs.Path
       val fs = Path.getFileSystem(conf)
Run Code Online (Sandbox Code Playgroud)

scala amazon-s3 amazon-web-services apache-spark apache-zeppelin

1
推荐指数
1
解决办法
2152
查看次数

我们可以在不创建模式的情况下在 spark 数据框中加载分隔文本文件吗?

我有格式良好的文本文件,如波纹管。

TimeStamp|^|LineItem_organizationId|^|LineItem_lineItemId|^|StatementTypeCode|^|LineItemName|^|LocalLanguageLabel|^|FinancialConceptLocal|^|FinancialConceptGlobal|^|IsDimensional|^|InstrumentId|^|LineItemSequence|^|PhysicalMeasureId|^|FinancialConceptCodeGlobalSecondary|^|IsRangeAllowed|^|IsSegmentedByOrigin|^|SegmentGroupDescription|^|SegmentChildDescription|^|SegmentChildLocalLanguageLabel|^|LocalLanguageLabel.languageId|^|LineItemName.languageId|^|SegmentChildDescription.languageId|^|SegmentChildLocalLanguageLabel.languageId|^|SegmentGroupDescription.languageId|^|SegmentMultipleFundbDescription|^|SegmentMultipleFundbDescription.languageId|^|IsCredit|^|FinancialConceptLocalId|^|FinancialConceptGlobalId|^|FinancialConceptCodeGlobalSecondaryId|^|FFAction|!|
1506702452474|^|4295876606|^|1|^|BAL|^|Cash And Deposits|^|2|^||^|ACAE|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3018759|^||^|I|!|
1506702452475|^|4295876606|^|4|^|BAL|^|Raw Materials And Supplies|^||^||^|AIRM|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3018830|^||^|I|!|
1506702452476|^|4295876606|^|10|^|BAL|^|Total current assets|^||^||^|XTCA|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3019590|^||^|I|!|
1506702452477|^|4295876606|^|53|^|BAL|^|Deferred Assets Total|^||^||^|ADFN|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014598|^||^|I|!|
1506702452478|^|4295876606|^|54|^|BAL|^|Total Assets|^||^||^|XTOT|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016350|^||^|I|!|
1506702452479|^|4295876606|^|107|^|BAL|^|Total Number Of Treasury Stock|^||^||^|XTCTI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016331|^||^|I|!|
1506702452480|^|4295876606|^|108|^|BAL|^|Total Number Of Issued Shares|^||^||^|XTCII|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016326|^||^|I|!|
1506702452481|^|4295876606|^|109|^|BAL|^|Total Number Of Issued Preferred Stock A|^||^||^|XTPII|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016352|^||^|I|!|
1506702452482|^|4295876606|^|111|^|CAS|^|Loss before income taxes|^||^||^|ONET|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3019196|^||^|I|!|
1506702452483|^|4295876606|^|130|^|CAS|^|Subtotal|^||^||^|FFFF|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014929|^||^|I|!|
1506702452484|^|4295876606|^|132|^|CAS|^|Net cash provided by (used in) operating activities|^||^||^|XTLO|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016344|^||^|I|!|
1506702452485|^|4295876606|^|133|^|CAS|^|Purchase of property, plant and equipment|^||^||^|ICEX|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014949|^||^|I|!|
1506702452486|^|4295876606|^|143|^|CAS|^|Net cash provided by (used in) investing activities|^||^||^|XTLI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016342|^||^|I|!|
1506702452487|^|4295876606|^|145|^|CAS|^|Proceeds from long-term loans payable|^||^||^|FLDI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014931|^||^|I|!|
Run Code Online (Sandbox Code Playgroud)

现在我必须将此文本文件加载到 spark 数据框中。

我可以这样做

val schema = StructType(Array(

      StructField("OrgId", StringType),
      StructField("LineItemId", StringType),
      StructField("SegmentId", StringType), …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql spark-dataframe

0
推荐指数
1
解决办法
1万
查看次数