Ale*_*rte 7 scala dataframe apache-spark
我有一个Dataframe,希望将它分成相同数量的行.
换句话说,我想要一个数据帧列表,其中每个数据帧都是原始数据帧的脱节子集.
假设输入dataframer如下:
+------------------+-----------+-----+--------------------+
| eventName|original_dt|count| features|
+------------------+-----------+-----+--------------------+
|15.509775004326936| 0| 100|[15.5097750043269...|
|15.509775004326936| 0| 101|[15.5097750043269...|
|15.509775004326936| 0| 102|[15.5097750043269...|
|15.509775004326936| 0| 103|[15.5097750043269...|
|15.509775004326936| 0| 104|[15.5097750043269...|
|15.509775004326936| 0| 105|[15.5097750043269...|
|15.509775004326936| 0| 106|[15.5097750043269...|
|15.509775004326936| 0| 107|[15.5097750043269...|
|15.509775004326936| 0| 108|[15.5097750043269...|
|15.509775004326936| 0| 109|[15.5097750043269...|
|15.509775004326936| 0| 110|[15.5097750043269...|
|15.509775004326936| 0| 111|[15.5097750043269...|
|15.509775004326936| 0| 112|[15.5097750043269...|
|15.509775004326936| 0| 113|[15.5097750043269...|
|15.509775004326936| 0| 114|[15.5097750043269...|
|15.509775004326936| 0| 115|[15.5097750043269...|
| 43.01955000865387| 0| 116|[43.0195500086538...|
+------------------+-----------+-----+--------------------+
Run Code Online (Sandbox Code Playgroud)
我希望将它拆分为K个相等大小的数据帧.如果k = 4,则可能的结果是:
+------------------+-----------+-----+--------------------+
| eventName|original_dt|count| features|
+------------------+-----------+-----+--------------------+
|15.509775004326936| 0| 106|[15.5097750043269...|
|15.509775004326936| 0| 107|[15.5097750043269...|
|15.509775004326936| 0| 110|[15.5097750043269...|
|15.509775004326936| 0| 111|[15.5097750043269...|
+------------------+-----------+-----+--------------------+
+------------------+-----------+-----+--------------------+
| eventName|original_dt|count| features|
+------------------+-----------+-----+--------------------+
|15.509775004326936| 0| 104|[15.5097750043269...|
|15.509775004326936| 0| 108|[15.5097750043269...|
|15.509775004326936| 0| 112|[15.5097750043269...|
|15.509775004326936| 0| 114|[15.5097750043269...|
+------------------+-----------+-----+--------------------+
+------------------+-----------+-----+--------------------+
| eventName|original_dt|count| features|
+------------------+-----------+-----+--------------------+
|15.509775004326936| 0| 100|[15.5097750043269...|
|15.509775004326936| 0| 105|[15.5097750043269...|
|15.509775004326936| 0| 109|[15.5097750043269...|
|15.509775004326936| 0| 115|[15.5097750043269...|
+------------------+-----------+-----+--------------------+
+------------------+-----------+-----+--------------------+
| eventName|original_dt|count| features|
+------------------+-----------+-----+--------------------+
|15.509775004326936| 0| 101|[15.5097750043269...|
|15.509775004326936| 0| 102|[15.5097750043269...|
|15.509775004326936| 0| 103|[15.5097750043269...|
|15.509775004326936| 0| 113|[15.5097750043269...|
| 43.01955000865387| 0| 116|[43.0195500086538...|
+------------------+-----------+-----+--------------------+
Run Code Online (Sandbox Code Playgroud)
另一种解决方案是使用 limit 和 except。以下程序将返回一个具有相同行数的 Dataframe 的数组。除了第一个可能包含较少的行。
var numberOfNew = 4
var input = List(1,2,3,4,5,6,7,8,9).toDF
var newFrames = 0 to numberOfNew map (_ => Seq.empty[Int].toDF) toArray
var size = input.count();
val limit = (size / numberOfNew).toInt
while (size > 0) {
newFrames(numberOfNew) = input.limit(limit)
input = input.except(newFrames(numberOfNew))
size = size - limit
numberOfNew = numberOfNew - 1
}
newFrames.foreach(_.show)
+-----+
|value|
+-----+
| 7|
+-----+
+-----+
|value|
+-----+
| 4|
| 8|
+-----+
+-----+
|value|
+-----+
| 5|
| 9|
+-----+
...
Run Code Online (Sandbox Code Playgroud)
根据我对您的输入和所需输出的理解,您可以通过row numbers
withgrouping
创建。dataframe
one groupId
然后您可以根据您的需要在其他地方进行filter
dataframe
比较。row number
storing
以下是满足您需求的临时解决方案。您可以根据您的需要进行更改
val k = 4
val windowSpec = Window.partitionBy("grouped").orderBy("original_dt")
val newDF = dataFrame.withColumn("grouped", lit("grouping"))
var latestDF = newDF.withColumn("row", row_number() over windowSpec)
val totalCount = latestDF.count()
var lowLimit = 0
var highLimit = lowLimit + k
while(lowLimit < totalCount){
latestDF.where(s"row <= ${highLimit} and row > ${lowLimit}").show(false)
lowLimit = lowLimit + k
highLimit = highLimit + k
}
Run Code Online (Sandbox Code Playgroud)
我希望这会给您一个良好的开始。