在Spark RDD或dataframe中随机随机播放列

Ben*_*ann 8 apache-spark spark-dataframe

无论如何,我可以随机播放RDD或数据帧的列,以使该列中的条目以随机顺序出现?我不确定我可以使用哪些API来完成这样的任务.

Sas*_*ter 5

如何选择要洗牌orderBy(rand)的列,按索引将其压缩到现有数据框的方式呢?

import org.apache.spark.sql.functions.rand

def addIndex(df: DataFrame) = spark.createDataFrame(
  // Add index
  df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
  // Create schema
  StructType(df.schema.fields :+ StructField("_index", LongType, false))
)

case class Entry(name: String, salary: Double)

val r1 = Entry("Max", 2001.21)
val r2 = Entry("Zhang", 3111.32)
val r3 = Entry("Bob", 1919.21)
val r4 = Entry("Paul", 3001.5)

val df = addIndex(spark.createDataFrame(Seq(r1, r2, r3, r4)))
val df_shuffled = addIndex(df
  .select(col("salary").as("salary_shuffled"))
  .orderBy(rand))

df.join(df_shuffled, Seq("_index"))
  .drop("_index")
  .show(false) 

+-----+-------+---------------+
|name |salary |salary_shuffled|
+-----+-------+---------------+
|Max  |2001.21|3001.5         |
|Zhang|3111.32|3111.32        |
|Paul |3001.5 |2001.21        |
|Bob  |1919.21|1919.21        |
+-----+-------+---------------+
Run Code Online (Sandbox Code Playgroud)


jav*_*dba 2

虽然我们不能直接打乱单个列,但可以排列RDDvia中的记录RandomRDDshttps://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/random/RandomRDDs.html

仅排列单个列的潜在方法可能是:

  • 用于mapPartitions对每个 Worker 任务进行一些设置/拆卸
  • 将所有记录吸入内存。IE iterator.toList确保您有许多(/小)数据分区以避免 OOME
  • 使用 Row 对象重写除给定列之外的所有内容
  • 在mapPartitions中创建一个内存中的排序列表
  • 对于所需的列,将其值放入单独的集合中,并随机对集合进行采样以替换每个记录的条目
  • 返回结果list.toIteratormapPartitions