Spark Local Mode - 所有作业仅使用一个CPU核心

twi*_*911 6 java amazon-ec2 amazon-web-services apache-spark

我们使用在单个AWS EC2实例上以本地模式运行Spark Java

"local[*]"

但是,使用New Relic工具进行性能分析和简单的"顶级"显示,我们已经编写了三个不同的Java spark作业,我们的16核心机器中只有一个CPU核心用过(我们也尝试过不同的AWS实例,但只有一个核心永远使用).

Runtime.getRuntime().availableProcessors()报告16个处理器和 sparkContext.defaultParallelism()16个报告.

我查看了各种Stackoverflow本地模式问题,但似乎没有解决问题.

任何建议都非常感谢.

谢谢

编辑:过程

1)使用sqlContext从光盘(S3)使用com.databricks.spark.csv读取gzip压缩的CSV文件1到DataFrame DF1.

2)使用sqlContext从光盘(S3)使用com.databricks.spark.csv将gzip压缩的CSV文件2读入DataFrame DF2.

3)使用DF1.toJavaRDD().mapToPair(返回元组的新映射函数)RDD1

4)使用DF2.toJavaRDD().mapToPair(返回元组的新映射函数)RDD2

5)在RDD上调用union

6)将联合RDD上的reduceByKey()调用为"按键合并",因此具有仅具有特定键的一个实例的元组>(因为同一键出现在RDD1和RDD2中).

7)调用.values().map(新映射函数,它迭代提供的List中的所有项目,并根据需要合并它们以返回相同或更小长度的List

8)调用.flatMap()来获取RDD

9)使用sqlContext从DomainClass类型的平面地图创建DataFrame

10)使用DF.coalease(1).write()将DF作为gzip压缩写入S3.

Tim*_*Tim 4

我认为你的问题是你的 CSV 文件被压缩了。当 Spark 读取文件时,它会并行加载它们,但只有当文件编解码器是可拆分*时它才能执行此操作。纯文本(非压缩)文本和 parquet 以及bgzip基因组学(我的领域)中使用的编解码器都是可拆分的。您的整个文件最终都存放在一个分区中。

尝试解压缩 csv.gz 文件并再次运行。我想你会看到更好的结果!

  • 可分割格式意味着,如果给定一个任意文件偏移量来开始读取,您可以找到块中下一条记录的开头并解释它。Gzip 压缩的文件不可分割。

编辑:我在我的机器上复制了这种行为。sc.textFile在 3G gzip 压缩文本文件上使用生成 1 个分区。