关于如何使用Scala中的随机值将新列添加到现有DataFrame

air*_*man 3 random scala user-defined-functions apache-spark apache-spark-sql

我有一个带有镶木地板文件的数据框,我必须添加一个带有一些随机数据的新列,但我需要彼此不同的随机数据.这是我的实际代码,当前版本的spark是1.5.1-cdh-5.5.2:

val mydf = sqlContext.read.parquet("some.parquet")
// mydf.count()
// 63385686 
mydf.cache

val r = scala.util.Random
import org.apache.spark.sql.functions.udf
def myNextPositiveNumber :String = { (r.nextInt(Integer.MAX_VALUE) + 1 ).toString.concat("D")}
val myFunction = udf(myNextPositiveNumber _)
val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber))
Run Code Online (Sandbox Code Playgroud)

使用此代码,我有这些数据:

scala> myNewDF.select("myNewColumn").show(10,false)
+-----------+
|myNewColumn|
+-----------+
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
+-----------+
Run Code Online (Sandbox Code Playgroud)

看起来udf myNextPositiveNumber只被调用一次,不是吗?

更新确认后,只有一个不同的值:

scala> myNewDF.select("myNewColumn").distinct.show(50,false)
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
...

+-----------+                                                                   
|myNewColumn|
+-----------+
|889488717D |
+-----------+
Run Code Online (Sandbox Code Playgroud)

我做错了什么?

更新2:最后,在@ user6910411的帮助下,我有这个代码:

val mydf = sqlContext.read.parquet("some.parquet")
// mydf.count()
// 63385686 
mydf.cache

val r = scala.util.Random

import org.apache.spark.sql.functions.udf

val accum = sc.accumulator(1)

def myNextPositiveNumber():String = {
   accum+=1
   accum.value.toString.concat("D")
}

val myFunction = udf(myNextPositiveNumber _)

val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber))

myNewDF.select("myNewColumn").count

// 63385686
Run Code Online (Sandbox Code Playgroud)

更新3

实际代码生成如下数据:

scala> mydf.select("myNewColumn").show(5,false)
17/02/22 11:01:57 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+-----------+
|myNewColumn|
+-----------+
|2D         |
|2D         |
|2D         |
|2D         |
|2D         |
+-----------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)

看起来udf函数只被调用一次,不是吗?我需要在该列中添加一个新的随机元素.

更新4 @ user6910411

我有这个实际的代码,增加了id,但它没有连接最后的char,这很奇怪.这是我的代码:

import org.apache.spark.sql.functions.udf


val mydf = sqlContext.read.parquet("some.parquet")

mydf.cache

def myNextPositiveNumber():String = monotonically_increasing_id().toString().concat("D")

val myFunction = udf(myNextPositiveNumber _)

val myNewDF = mydf.withColumn("myNewColumn",expr(myNextPositiveNumber))

scala> myNewDF.select("myNewColumn").show(5,false)
17/02/22 12:00:02 WARN Executor: 1 block locks were not released by TID = 1:
[rdd_4_0]
+-----------+
|myNewColumn|
+-----------+
|0          |
|1          |
|2          |
|3          |
|4          |
+-----------+
Run Code Online (Sandbox Code Playgroud)

我需要这样的东西:

+-----------+
|myNewColumn|
+-----------+
|1D         |
|2D         |
|3D         |
|4D         |
+-----------+
Run Code Online (Sandbox Code Playgroud)

use*_*411 14

Spark> = 2.3

可以使用asNondeterministic方法禁用某些优化:

import org.apache.spark.sql.expressions.UserDefinedFunction

val f: UserDefinedFunction = ???
val fNonDeterministic: UserDefinedFunction = f.asNondeterministic
Run Code Online (Sandbox Code Playgroud)

在使用此选项之前,请确保您了解保证.

Spark <2.3

传递给udf的函数应该是确定性的(可能除了SPARK-20586),并且可以用常量替换nullary函数调用.如果要生成随机数,请使用内置函数:

  • rand- 使用来自U [0.0,1.0]的独立且相同分布的(iid)样本生成随机列.
  • randn- 从标准正态分布生成具有独立且相同分布(iid)样本的列.

并转换输出以获得所需的分布,例如:

(rand * Integer.MAX_VALUE).cast("bigint").cast("string")
Run Code Online (Sandbox Code Playgroud)