Spark Scala将数据帧拆分为相同的行数

Ale*_*rte 7 scala dataframe apache-spark

我有一个Dataframe,希望将它分成相同数量的行.

换句话说,我想要一个数据帧列表,其中每个数据帧都是原始数据帧的脱节子集.

假设输入dataframer如下:

  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  100|[15.5097750043269...|
  |15.509775004326936|          0|  101|[15.5097750043269...|
  |15.509775004326936|          0|  102|[15.5097750043269...|
  |15.509775004326936|          0|  103|[15.5097750043269...|
  |15.509775004326936|          0|  104|[15.5097750043269...|
  |15.509775004326936|          0|  105|[15.5097750043269...|
  |15.509775004326936|          0|  106|[15.5097750043269...|
  |15.509775004326936|          0|  107|[15.5097750043269...|
  |15.509775004326936|          0|  108|[15.5097750043269...|
  |15.509775004326936|          0|  109|[15.5097750043269...|
  |15.509775004326936|          0|  110|[15.5097750043269...|
  |15.509775004326936|          0|  111|[15.5097750043269...|
  |15.509775004326936|          0|  112|[15.5097750043269...|
  |15.509775004326936|          0|  113|[15.5097750043269...|
  |15.509775004326936|          0|  114|[15.5097750043269...|
  |15.509775004326936|          0|  115|[15.5097750043269...|
  | 43.01955000865387|          0|  116|[43.0195500086538...|
  +------------------+-----------+-----+--------------------+
Run Code Online (Sandbox Code Playgroud)

我希望将它拆分为K个相等大小的数据帧.如果k = 4,则可能的结果是:

  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  106|[15.5097750043269...|
  |15.509775004326936|          0|  107|[15.5097750043269...|
  |15.509775004326936|          0|  110|[15.5097750043269...|
  |15.509775004326936|          0|  111|[15.5097750043269...|
  +------------------+-----------+-----+--------------------+

  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  104|[15.5097750043269...|
  |15.509775004326936|          0|  108|[15.5097750043269...|
  |15.509775004326936|          0|  112|[15.5097750043269...|
  |15.509775004326936|          0|  114|[15.5097750043269...|
  +------------------+-----------+-----+--------------------+


  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  100|[15.5097750043269...|
  |15.509775004326936|          0|  105|[15.5097750043269...|
  |15.509775004326936|          0|  109|[15.5097750043269...|
  |15.509775004326936|          0|  115|[15.5097750043269...|
  +------------------+-----------+-----+--------------------+


  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  101|[15.5097750043269...|
  |15.509775004326936|          0|  102|[15.5097750043269...|
  |15.509775004326936|          0|  103|[15.5097750043269...|
  |15.509775004326936|          0|  113|[15.5097750043269...|
  | 43.01955000865387|          0|  116|[43.0195500086538...|
  +------------------+-----------+-----+--------------------+
Run Code Online (Sandbox Code Playgroud)

Ste*_*itz 6

另一种解决方案是使用 limit 和 except。以下程序将返回一个具有相同行数的 Dataframe 的数组。除了第一个可能包含较少的行。

var numberOfNew = 4
var input = List(1,2,3,4,5,6,7,8,9).toDF
var newFrames = 0 to numberOfNew map (_ => Seq.empty[Int].toDF) toArray
var size = input.count();
val limit = (size / numberOfNew).toInt

while (size > 0) {
    newFrames(numberOfNew) = input.limit(limit)
    input = input.except(newFrames(numberOfNew))
    size = size - limit
    numberOfNew = numberOfNew - 1
}

newFrames.foreach(_.show)

+-----+
|value|
+-----+
|    7|
+-----+

+-----+
|value|
+-----+
|    4|
|    8|
+-----+

+-----+
|value|
+-----+
|    5|
|    9|
+-----+

...
Run Code Online (Sandbox Code Playgroud)


Ram*_*jan 3

根据我对您的输入和所需输出的理解,您可以通过row numberswithgrouping创建。dataframeone groupId

然后您可以根据您的需要在其他地方进行filter dataframe比较。row numberstoring

以下是满足您需求的临时解决方案。您可以根据您的需要进行更改

val k = 4

val windowSpec = Window.partitionBy("grouped").orderBy("original_dt")

val newDF = dataFrame.withColumn("grouped", lit("grouping"))

var latestDF = newDF.withColumn("row", row_number() over windowSpec)

val totalCount = latestDF.count()
var lowLimit = 0
var highLimit = lowLimit + k

while(lowLimit < totalCount){
  latestDF.where(s"row <= ${highLimit} and row > ${lowLimit}").show(false)
  lowLimit = lowLimit + k
  highLimit = highLimit + k
}
Run Code Online (Sandbox Code Playgroud)

我希望这会给您一个良好的开始。