相关疑难解决方法(0)

处理Spark中的大型gzip压缩文件

我有一个来自s3的大型(大约85 GB压缩)gzip压缩文件,我正在尝试使用AWS EMR上的Spark处理(现在有一个m4.xlarge主实例和两个m4.10xlarge核心实例,每个实例都有一个100 GB的EBS卷) .我知道gzip是一种不可拆分的文件格式, 看到 建议应该重新分区压缩文件,因为Spark最初给出了一个带有一个分区的RDD.但是,做完之后

scala> val raw = spark.read.format("com.databricks.spark.csv").
     | options(Map("delimiter" -> "\\t", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")).
     | load("s3://path/to/file.gz").
     | repartition(sc.defaultParallelism * 3)
raw: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: string, _c1: string ... 48 more fields
scala> raw.count()
Run Code Online (Sandbox Code Playgroud)

并且看一下Spark应用程序UI,我仍然看到只有一个活动执行程序(其他14个已经死了)有一个任务,并且作业永远不会完成(或者至少我没有等待足够长的时间).

  • 这里发生了什么?有人可以帮我理解Spark在这个例子中是如何工作的吗?
  • 我应该使用不同的群集配置吗?
  • 不幸的是,我无法控制压缩模式,但有没有其他方法来处理这样的文件?

gzip amazon-emr apache-spark

9
推荐指数
2
解决办法
8785
查看次数

Spark Local Mode - 所有作业仅使用一个CPU核心

我们使用在单个AWS EC2实例上以本地模式运行Spark Java

"local[*]"

但是,使用New Relic工具进行性能分析和简单的"顶级"显示,我们已经编写了三个不同的Java spark作业,我们的16核心机器中只有一个CPU核心用过(我们也尝试过不同的AWS实例,但只有一个核心永远使用).

Runtime.getRuntime().availableProcessors()报告16个处理器和 sparkContext.defaultParallelism()16个报告.

我查看了各种Stackoverflow本地模式问题,但似乎没有解决问题.

任何建议都非常感谢.

谢谢

编辑:过程

1)使用sqlContext从光盘(S3)使用com.databricks.spark.csv读取gzip压缩的CSV文件1到DataFrame DF1.

2)使用sqlContext从光盘(S3)使用com.databricks.spark.csv将gzip压缩的CSV文件2读入DataFrame DF2.

3)使用DF1.toJavaRDD().mapToPair(返回元组的新映射函数)RDD1

4)使用DF2.toJavaRDD().mapToPair(返回元组的新映射函数)RDD2

5)在RDD上调用union

6)将联合RDD上的reduceByKey()调用为"按键合并",因此具有仅具有特定键的一个实例的元组>(因为同一键出现在RDD1和RDD2中).

7)调用.values().map(新映射函数,它迭代提供的List中的所有项目,并根据需要合并它们以返回相同或更小长度的List

8)调用.flatMap()来获取RDD

9)使用sqlContext从DomainClass类型的平面地图创建DataFrame

10)使用DF.coalease(1).write()将DF作为gzip压缩写入S3.

java amazon-ec2 amazon-web-services apache-spark

6
推荐指数
1
解决办法
1485
查看次数