Mic*_*son 14 amazon-ec2 apache-spark pyspark
我已经在Spark的独立模式下启动了一个带有ec2脚本的10节点集群.我正在从PySpark shell中访问s3存储桶中的数据,但是当我在RDD上执行transormations时,只使用了一个节点.例如,下面将读取CommonCorpus的数据:
bucket = ("s3n://@aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2014-23/"
"/segments/1404776400583.60/warc/CC-MAIN-20140707234000-00000-ip-10"
"-180-212-248.ec2.internal.warc.gz")
data = sc.textFile(bucket)
data.count()
Run Code Online (Sandbox Code Playgroud)
当我运行它时,我的10个从属中只有一个处理数据.我知道这一点,因为从Spark Web控制台查看时,只有一个从站(213)具有该活动的任何日志.当我在Ganglia中查看活动时,同一节点(213)是唯一一个在运行活动时内存使用量增加的奴隶.
此外,当我使用仅有一个slave的ec2集群运行相同的脚本时,我具有完全相同的性能.我正在使用Spark 1.1.0,非常感谢任何帮助或建议.
Nic*_*mas 18
...ec2.internal.warc.gz
我认为你使用gzip压缩文件遇到了一个相当典型的问题,因为它们无法并行加载.更具体地说,单个gzip压缩文件不能由多个任务并行加载,因此Spark将使用1个任务加载它,从而为您提供带有1个分区的RDD.
(但请注意,Spark可以并行加载10个gzip文件就好了;只是这10个文件中的每个文件只能由1个任务加载.你仍然可以跨文件获得并行,而不是在文件中.)
您可以通过显式检查RDD中的分区数来确认您只有1个分区:
data.getNumPartitions()
Run Code Online (Sandbox Code Playgroud)
可以在RDD上并行运行的任务数量的上限是RDD中的分区数或群集中的从属核心数,以较低者为准.
在您的情况下,它是RDD分区的数量.您可以通过按如下方式重新分区RDD来增加它:
data = sc.textFile(bucket).repartition(sc.defaultParallelism * 3)
Run Code Online (Sandbox Code Playgroud)
为什么sc.defaultParallelism * 3?
Spark Tuning指南建议每个核心有2-3个任务,并sc.defaultParalellism为您提供群集中的核心数.
| 归档时间: |
|
| 查看次数: |
2564 次 |
| 最近记录: |