我有一个火花工作,我在两个数据帧之间进行外连接.第一个数据帧的大小为260 GB,文件格式为文本文件,分为2200个文件,第二个数据帧的大小为2GB.然后将大约260 GB的数据帧输出写入S3需要很长时间超过2小时后我取消因为我已经在EMR上进行了大量更改.
这是我的群集信息.
emr-5.9.0
Master: m3.2xlarge
Core: r4.16xlarge 10 machines (each machine has 64 vCore, 488 GiB memory,EBS Storage:100 GiB)
Run Code Online (Sandbox Code Playgroud)
这是我正在设置的群集配置
capacity-scheduler yarn.scheduler.capacity.resource-calculator :org.apache.hadoop.yarn.util.resource.DominantResourceCalculator
emrfs-site fs.s3.maxConnections: 200
spark maximizeResourceAllocation: true
spark-defaults spark.dynamicAllocation.enabled: true
Run Code Online (Sandbox Code Playgroud)
我尝试手动设置内存组件,如下所示,性能更好,但同样的事情又花了很长时间
--num-executors 60 - conf spark.yarn.executor.memoryOverhead = 9216 --executor-memory 72G --conf spark.yarn.driver.memoryOverhead = 3072 --driver-memory 26G --execeror-cores 10 - driver-cores 3 --conf spark.default.parallelism = 1200
我没有使用默认分区将数据保存到S3.
添加有关作业和查询计划的所有详细信息,以便于理解.
真正的原因是分区.这大部分时间都在占用.因为我有2K文件,所以如果我使用像200这样的重新分区,输出文件以十万分之一形式出现,然后在spark中重新加载并不是一个好故事.
在下面Image GC对我来说太高了..请oi必须处理这个请建议如何?
下面是节点健康状态.这一点数据被保存到S3中,难怪为什么我只能看到两个节点处于活动状态并且都处于空闲状态.
这是加载时的集群细节.在这一点上,我可以看到集群已被充分利用,但在将数据保存到S3时,许多节点都是免费的.
最后这里是我的代码,我执行Join然后保存到S3 ...
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId").orderBy(unix_timestamp($"TimeStamp", …
Run Code Online (Sandbox Code Playgroud) I have a java application where i take files very small file (1KB) but large number of small file like in a minute i.e i am getting 20000 files in a minute. I am taking file and uploading into S3 .
I am running this in 10 parallel threads . Also i have to continuously run this application .
When this application runs for some days i get Out of memory error.
This is the exact error i get
# …
Run Code Online (Sandbox Code Playgroud) 我有一个应用程序,其中
每天会生成 100 万到 1 亿个甚至更多的小 xml 文件,我必须将其加载到 S3 存储桶之一中,就像仅加载单个 xml 文件一样。该 S3 存储桶链接到 CloudFront,以便我们世界各地的客户可以更快地访问 xml 文件。
除了成本部分之外,一切对我来说都很好。随着文件数量的增加,S3 put 请求的成本每天都在增加。文件应在出现后立即推送,并且应可从 cloudFront 访问。
有什么解决方案可以节省我的成本吗?xml 文件的大小最大为 2 kb。
让我在这里详细阐述我迄今为止尝试过的一些要点。我想在本地合并所有小 xml,然后将其推送到 S3,但问题是我们应该在 AWS 中进行一些计算,以将其再次提取到小文件中,因为最终用户只接受单独的 xml 文件。提取并创建小文件并再次保存到 S3 中的成本会更高。
因此,请随意建议一些可能适合此用例的其他系统,而不是 S3。我也尝试过 HBASE,但在 AWS 中运行和管理 HBASE 是一个问题。我们还尝试了 Dynamo DB,但成本也更高。
我的MapReduce必须从HBase读取记录并需要写入zip文件.我们的客户特别询问减速器输出文件应该只是.zip
文件.
为此,我编写了ZipFileOutputFormat
包装来压缩记录并写入zip文件.
此外,我们不能使用缓冲区并将所有行保留到缓冲区然后迭代,因为某些文件包含19GB的记录,那时它将抛出一个java.lang.OutOfMemoryError
.
一切似乎都好,但有一个问题:
该.zip
是越来越为每个键创建的文件.在我的输出文件中,我可以看到许多输出文件,这些是每行键分隔文件.我不知道如何将它组合在zip文件中.
这是我的实施 ZipFileOutputFormat.java
public class ZipFileOutputFormat<K, V> extends FileOutputFormat<K, V> {
public static class ZipRecordWriter<K, V> extends org.apache.hadoop.mapreduce.RecordWriter<K, V> {
private ZipOutputStream zipOut;
public ZipRecordWriter(FSDataOutputStream fileOut) {
zipOut = new ZipOutputStream(fileOut);
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
zipOut.closeEntry();
zipOut.finish();
zipOut.close();
zipOut.flush();
}
@Override
public void write(K key, V value) throws IOException {
String fname = null;
if (key instanceof …
Run Code Online (Sandbox Code Playgroud) 我想要使用Java程序的Linux系统的分区名称及其总空间,已用空间和可用空间。
对于Windows系统,我得到的是正确的值,但是在Linux中,我仅得到一个驱动器信息:
到目前为止,这是我尝试过的。
public class DiskSpace {
public static void main(String[] args) {
FileSystemView fsv = FileSystemView.getFileSystemView();
File[] drives = File.listRoots();
if (drives != null && drives.length > 0) {
for (File aDrive : drives) {
System.out.println("Drive Letter: " + aDrive);
System.out.println("\tType: " + fsv.getSystemTypeDescription(aDrive));
System.out.println("\tTotal space: " + aDrive.getTotalSpace());
System.out.println("\tFree space: " + aDrive.getFreeSpace());
System.out.println();
}
}
}
Run Code Online (Sandbox Code Playgroud) 这是我的联盟代码:
val dfToSave=dfMainOutput.union(insertdf.select(dfMainOutput).withColumn("FFAction", when($"FFAction" === "O" || $"FFAction" === "I", lit("I|!|")))
Run Code Online (Sandbox Code Playgroud)
当我结合时,我得到以下错误:
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. string <> boolean at the 11th column of the second table;;
'Union
Run Code Online (Sandbox Code Playgroud)
这是两个数据帧的模式:
insertdf.printSchema()
root
|-- OrganizationID: long (nullable = true)
|-- SourceID: integer (nullable = true)
|-- AuditorID: integer (nullable = true)
|-- AuditorOpinionCode: string (nullable = true)
|-- AuditorOpinionOnInternalControlCode: string (nullable = true)
|-- AuditorOpinionOnGoingConcernCode: string (nullable = true)
|-- IsPlayingAuditorRole: boolean (nullable …
Run Code Online (Sandbox Code Playgroud) 我在S3中存储了大约一百万个文本文件。我想根据文件夹名称重命名所有文件。
我如何在Spark-Scala中做到这一点?
我正在寻找一些示例代码。
我正在使用齐柏林飞艇来运行我的spark脚本。
下面的代码我已经尝试从答案中建议
import org.apache.hadoop.fs._
val src = new Path("s3://trfsmallfffile/FinancialLineItem/MAIN")
val dest = new Path("s3://trfsmallfffile/FinancialLineItem/MAIN/dest")
val conf = sc.hadoopConfiguration // assuming sc = spark context
val fs = Path.getFileSystem(conf)
fs.rename(src, dest)
Run Code Online (Sandbox Code Playgroud)
但是低于错误
<console>:110: error: value getFileSystem is not a member of object org.apache.hadoop.fs.Path
val fs = Path.getFileSystem(conf)
Run Code Online (Sandbox Code Playgroud) scala amazon-s3 amazon-web-services apache-spark apache-zeppelin
我有格式良好的文本文件,如波纹管。
TimeStamp|^|LineItem_organizationId|^|LineItem_lineItemId|^|StatementTypeCode|^|LineItemName|^|LocalLanguageLabel|^|FinancialConceptLocal|^|FinancialConceptGlobal|^|IsDimensional|^|InstrumentId|^|LineItemSequence|^|PhysicalMeasureId|^|FinancialConceptCodeGlobalSecondary|^|IsRangeAllowed|^|IsSegmentedByOrigin|^|SegmentGroupDescription|^|SegmentChildDescription|^|SegmentChildLocalLanguageLabel|^|LocalLanguageLabel.languageId|^|LineItemName.languageId|^|SegmentChildDescription.languageId|^|SegmentChildLocalLanguageLabel.languageId|^|SegmentGroupDescription.languageId|^|SegmentMultipleFundbDescription|^|SegmentMultipleFundbDescription.languageId|^|IsCredit|^|FinancialConceptLocalId|^|FinancialConceptGlobalId|^|FinancialConceptCodeGlobalSecondaryId|^|FFAction|!|
1506702452474|^|4295876606|^|1|^|BAL|^|Cash And Deposits|^|2|^||^|ACAE|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3018759|^||^|I|!|
1506702452475|^|4295876606|^|4|^|BAL|^|Raw Materials And Supplies|^||^||^|AIRM|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3018830|^||^|I|!|
1506702452476|^|4295876606|^|10|^|BAL|^|Total current assets|^||^||^|XTCA|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3019590|^||^|I|!|
1506702452477|^|4295876606|^|53|^|BAL|^|Deferred Assets Total|^||^||^|ADFN|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014598|^||^|I|!|
1506702452478|^|4295876606|^|54|^|BAL|^|Total Assets|^||^||^|XTOT|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016350|^||^|I|!|
1506702452479|^|4295876606|^|107|^|BAL|^|Total Number Of Treasury Stock|^||^||^|XTCTI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016331|^||^|I|!|
1506702452480|^|4295876606|^|108|^|BAL|^|Total Number Of Issued Shares|^||^||^|XTCII|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016326|^||^|I|!|
1506702452481|^|4295876606|^|109|^|BAL|^|Total Number Of Issued Preferred Stock A|^||^||^|XTPII|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016352|^||^|I|!|
1506702452482|^|4295876606|^|111|^|CAS|^|Loss before income taxes|^||^||^|ONET|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3019196|^||^|I|!|
1506702452483|^|4295876606|^|130|^|CAS|^|Subtotal|^||^||^|FFFF|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014929|^||^|I|!|
1506702452484|^|4295876606|^|132|^|CAS|^|Net cash provided by (used in) operating activities|^||^||^|XTLO|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016344|^||^|I|!|
1506702452485|^|4295876606|^|133|^|CAS|^|Purchase of property, plant and equipment|^||^||^|ICEX|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014949|^||^|I|!|
1506702452486|^|4295876606|^|143|^|CAS|^|Net cash provided by (used in) investing activities|^||^||^|XTLI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016342|^||^|I|!|
1506702452487|^|4295876606|^|145|^|CAS|^|Proceeds from long-term loans payable|^||^||^|FLDI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014931|^||^|I|!|
Run Code Online (Sandbox Code Playgroud)
现在我必须将此文本文件加载到 spark 数据框中。
我可以这样做
val schema = StructType(Array(
StructField("OrgId", StringType),
StructField("LineItemId", StringType),
StructField("SegmentId", StringType), …
Run Code Online (Sandbox Code Playgroud)