为什么这个简单的Spark程序没有使用多个内核?

Met*_*est 4 python scala multicore bigdata apache-spark

所以,我在16核多核系统上运行这个简单的程序.我通过发出以下命令来运行它.

spark-submit --master local[*] pi.py
Run Code Online (Sandbox Code Playgroud)

该程序的代码如下.

#"""pi.py"""
from pyspark import SparkContext
import random

N = 12500000

def sample(p):
    x, y = random.random(), random.random()
    return 1 if x*x + y*y < 1 else 0

sc = SparkContext("local", "Test App")
count = sc.parallelize(xrange(0, N)).map(sample).reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
Run Code Online (Sandbox Code Playgroud)

当我使用top来查看CPU消耗时,只使用了1个核心.为什么会这样?Seconldy,spark文档说默认并行性包含在属性 spark.default.parallelism中.如何从我的python程序中读取此属性?

Iva*_*rov 18

因为以上都没有真正对我有用(也许是因为我真的不理解它们),这是我的两分钱.

我在我spark-submit program.py的文件中开始工作sc = SparkContext("local", "Test").我试图验证核心数量火花 看到sc.defaultParallelism.事实证明它是1.当我将上下文初始化改为sc = SparkContext("local[*]", "Test")16时(我的系统的核心数),我的程序正在使用所有核心.

我很新的火花,但我的理解是,本地默认指示使用一个核心的,当它被设置里面的程序,它会覆盖其他设置(肯定在我的情况下,它会覆盖从配置文件和环境变量).

  • 另外,您可以将`*`替换为您希望使用的核心数.将其保留为"*"默认为本地核心的最大数量.使用`local`将其设置为1核心. (7认同)
  • 它只是有效.作为一个干净的解决方案,恕我直言,比第一个答案更好. (2认同)
  • 伟大的!但为什么没有在任何地方提到它?我的意思是,它甚至不直观,对吧?“local[*]”是一种什么样的语法?无论如何,如果这个答案不在这里,我会浪费几个小时挠头 (2认同)

Sve*_*end 5

可能是因为对sc.parallelize的调用将所有数据放入一个单独的分区.您可以将分区数指定为并行化的第二个参数:

part = 16
count = sc.parallelize(xrange(N), part).map(sample).reduce(lambda a, b: a + b)
Run Code Online (Sandbox Code Playgroud)

请注意,这仍然会在驱动程序中使用一个CPU生成1200万个点,然后仅将它们分散到16个分区以执行reduce步骤.

在分区之后,更好的方法是尝试完成大部分工作:例如,以下内容仅在驱动程序上生成一个小数组,然后让每个远程任务生成实际的随机数和随后的PI近似值:

part = 16
count = ( sc.parallelize([0] * part, part)
           .flatMap(lambda blah: [sample(p) for p in xrange( N/part)])
           .reduce(lambda a, b: a + b)
       )
Run Code Online (Sandbox Code Playgroud)

最后,(因为我们越懒越越好),spark mllib实际上已经有了一个很好地并行化的随机数据生成,请看一下:http://spark.apache.org/docs/1.1.0/mllib -statistics.html#random-data-generation.所以也许以下内容接近您尝试做的事情(未经测试=>可能无法正常工作,但希望能够接近)

count = ( RandomRDDs.uniformRDD(sc, N, part)
        .zip(RandomRDDs.uniformRDD(sc, N, part))
        .filter (lambda (x, y): x*x + y*y < 1)
        .count()
        )
Run Code Online (Sandbox Code Playgroud)