PySpark中的随机数生成

zer*_*323 11 python random apache-spark rdd pyspark

让我们从一个简单的函数开始,它总是返回一个随机整数:

import numpy as np

def f(x):
    return np.random.randint(1000)
Run Code Online (Sandbox Code Playgroud)

和RDD填充零并映射使用f:

rdd = sc.parallelize([0] * 10).map(f)
Run Code Online (Sandbox Code Playgroud)

由于上面的RDD没有持久化,我希望每次收集时都会得到不同的输出:

> rdd.collect()
[255, 512, 512, 512, 255, 512, 255, 512, 512, 255]
Run Code Online (Sandbox Code Playgroud)

如果我们忽略这样一个事实,即值的分布实际上并不是随机的,那么它或多或少都会发生.问题开始时我们只采用第一个元素:

assert len(set(rdd.first() for _ in xrange(100))) == 1
Run Code Online (Sandbox Code Playgroud)

要么

assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1
Run Code Online (Sandbox Code Playgroud)

它似乎每次返回相同的数字.我已经能够使用Spark 1.2,1.3和1.4在两台不同的机器上重现这种行为.我在这里使用,np.random.randint但它的行为方式相同random.randint.

这个问题与非完全随机的结果相同collect,似乎是Python特有的,我无法使用Scala重现它:

def f(x: Int) = scala.util.Random.nextInt(1000)

val rdd = sc.parallelize(List.fill(10)(0)).map(f)
(1 to 100).map(x => rdd.first).toSet.size

rdd.collect()
Run Code Online (Sandbox Code Playgroud)

我错过了一些明显的东西吗?

编辑:

原来问题的根源是Python RNG的实现.引用官方文档:

该模块提供的函数实际上是random.Random类的隐藏实例的绑定方法.您可以实例化您自己的Random实例,以获取不共享状态的生成器.

我假设NumPy以相同的方式工作并f使用RandomState实例重写如下

import os
import binascii

def f(x, seed=None):
    seed = (
        seed if seed is not None 
        else int(binascii.hexlify(os.urandom(4)), 16))
    rs = np.random.RandomState(seed)
    return rs.randint(1000)
Run Code Online (Sandbox Code Playgroud)

使它变慢但解决了问题.

虽然上面解释的不是来自collect的随机结果,但我仍然不理解它如何影响first/ take(1)在多个动作之间.

zer*_*323 4

所以这里的实际问题是比较简单的。Python 中的每个子进程都从其父进程继承其状态:

len(set(sc.parallelize(range(4), 4).map(lambda _: random.getstate()).collect()))
# 1
Run Code Online (Sandbox Code Playgroud)

由于父状态在这种特定情况下没有理由改变,并且工作人员的寿命有限,因此每个孩子的状态在每次运行时都将完全相同。