Spark - 随机数生成

Bri*_*ian 10 random scala apache-spark spark-dataframe

我写了一个方法,必须考虑一个随机数来模拟伯努利分布.我random.nextDouble用来生成0到1之间的数字,然后根据我的概率参数给出我的决定.

我的问题是Spark在我的for循环映射函数的每次迭代中生成相同的随机数.我正在使用DataFrameAPI.我的代码遵循以下格式:

val myClass = new MyClass()
val M = 3
val myAppSeed = 91234
val rand = new scala.util.Random(myAppSeed)

for (m <- 1 to M) {
  val newDF = sqlContext.createDataFrame(myDF
    .map{row => RowFactory
      .create(row.getString(0),
        myClass.myMethod(row.getString(2), rand.nextDouble())
    }, myDF.schema)
}
Run Code Online (Sandbox Code Playgroud)

这是班级:

class myClass extends Serializable {
  val q = qProb

  def myMethod(s: String, rand: Double) = {
    if (rand <= q) // do something
    else // do something else
  }
}
Run Code Online (Sandbox Code Playgroud)

每次myMethod调用时我都需要一个新的随机数.我也尝试在我的方法中生成数字java.util.Random(scala.util.Randomv10没有扩展Serializable),如下所示,但我仍然在每个for循环中得到相同的数字

val r = new java.util.Random(s.hashCode.toLong)
val rand = r.nextDouble()
Run Code Online (Sandbox Code Playgroud)

我做了一些研究,似乎这与Sparks的确定性有关.

Dav*_*fin 11

只需使用SQL函数rand:

import org.apache.spark.sql.functions._

//df: org.apache.spark.sql.DataFrame = [key: int]

df.select($"key", rand() as "rand").show
+---+-------------------+
|key|               rand|
+---+-------------------+
|  1| 0.8635073400704648|
|  2| 0.6870153659986652|
|  3|0.18998048357873532|
+---+-------------------+


df.select($"key", rand() as "rand").show
+---+------------------+
|key|              rand|
+---+------------------+
|  1|0.3422484248879837|
|  2|0.2301384925817671|
|  3|0.6959421970071372|
+---+------------------+
Run Code Online (Sandbox Code Playgroud)


leo*_*o9r 6

根据这篇文章,最好的解决方案不是将new scala.util.Random内部放在地图中,也不完全放在外部(即在驱动程序代码中),而是放在中间mapPartitionsWithIndex

import scala.util.Random
val myAppSeed = 91234
val newRDD = myRDD.mapPartitionsWithIndex { (indx, iter) =>
   val rand = new scala.util.Random(indx+myAppSeed)
   iter.map(x => (x, Array.fill(10)(rand.nextDouble)))
}
Run Code Online (Sandbox Code Playgroud)


Pas*_*ucy 5

重复相同序列的原因是随机生成器是在数据分区之前创建并用种子初始化的。然后每个分区从相同的随机种子开始。也许不是最有效的方法,但以下应该有效:

val myClass = new MyClass()
val M = 3

for (m <- 1 to M) {
  val newDF = sqlContext.createDataFrame(myDF
    .map{ 
       val rand = scala.util.Random
       row => RowFactory
      .create(row.getString(0),
        myClass.myMethod(row.getString(2), rand.nextDouble())
    }, myDF.schema)
}
Run Code Online (Sandbox Code Playgroud)