collect_list通过保留基于另一个变量的顺序

Rav*_*avi 30 python apache-spark pyspark

我正在尝试使用现有列集上的groupby聚合在Pyspark中创建新列表.下面提供了一个示例输入数据框:

------------------------
id | date        | value
------------------------
1  |2014-01-03   | 10 
1  |2014-01-04   | 5
1  |2014-01-05   | 15
1  |2014-01-06   | 20
2  |2014-02-10   | 100   
2  |2014-03-11   | 500
2  |2014-04-15   | 1500
Run Code Online (Sandbox Code Playgroud)

预期的产出是:

id | value_list
------------------------
1  | [10, 5, 15, 20]
2  | [100, 500, 1500]
Run Code Online (Sandbox Code Playgroud)

列表中的值按日期排序.

我尝试使用collect_list如下:

from pyspark.sql import functions as F
ordered_df = input_df.orderBy(['id','date'],ascending = True)
grouped_df = ordered_df.groupby("id").agg(F.collect_list("value"))
Run Code Online (Sandbox Code Playgroud)

但即使我在聚合之前按日期对输入数据框进行排序,collect_list也不保证顺序.

有人可以通过保留基于第二个(日期)变量的订单来帮助如何进行聚合吗?

TMi*_*hel 54

from pyspark.sql import functions as F
from pyspark.sql import Window

w = Window.partitionBy('id').orderBy('date')

sorted_list_df = input_df.withColumn(
            'sorted_list', F.collect_list('value').over(w)
        )\
        .groupBy('id')\
        .agg(F.max('sorted_list').alias('sorted_list'))
Run Code Online (Sandbox Code Playgroud)

Window 用户提供的示例通常无法解释正在发生的事情,因此请让我为您剖析.

如您所知,collect_list与...一起使用groupBy将导致无序的值列表.这是因为根据数据的分区方式,一旦在组中找到一行,Spark就会将值附加到列表中.然后,订单取决于Spark如何计划您对执行程序的聚合.

Window功能可以控制这种情况下,通过一定值分组的行,以便可以执行的操作over得到的各组:

w = Window.partitionBy('id').orderBy('date')
Run Code Online (Sandbox Code Playgroud)
  • partitionBy - 您希望行的组/分区具有相同的组 id
  • orderBy - 您希望对组中的每一行进行排序 date

一旦定义了Window的范围 - "具有相同的行id,按行分类date" - ,您可以使用它来对其执行操作,在本例中为collect_list:

F.collect_list('value').over(w)
Run Code Online (Sandbox Code Playgroud)

此时,您创建了一个新列,sorted_list其中包含按日期排序的有序值列表,但您仍然拥有重复的行数id.要删除所需的重复行groupBy id并保留max每个组的值:

.groupBy('id')\
.agg(F.max('sorted_list').alias('sorted_list'))
Run Code Online (Sandbox Code Playgroud)

  • 由于使用Spark基本功能,这应该是可接受的答案-非常好! (8认同)
  • 需要最大值,因为对于相同的“id”,会按排序顺序为每行创建一个列表:[10],然后是 [10, 5],然后是 [10, 5, 15],然后是 [10, 5, 15, 20] id=1。取最大列表需要最长的列表(此处为 [10, 5, 15, 20])。 (8认同)
  • 这对记忆有何影响?当我们处理超过十亿个事件的链接时,当一个链在收集列表中最多可以有 10.000 个项目时,这种方法是否比公认的答案更好? (5认同)
  • 这不是很广阔吗?如果我有1000万组,每组有24个元素。`F.collect_list('value').over(w)` 将创建一个新的列大小从 1 到 24 ,即 1000 万 * 24 次。然后通过从每个组中获取最大的行来进行另一组。 (3认同)
  • 如果您使用“collect_set”而不是“collect_list”,则此方法不起作用。 (2认同)
  • 如果单个 id 有大量行,这似乎不可行。如果 id=1 有 n 行长数据类型,那么在 groupby 之前,您需要存储 (n^2)/2 个长数据,例如,如果 n=10**7 那么您将需要 400 TB仅列,如果 n=10**8,则需要 40 PB 等 (2认同)

mto*_*oto 35

如果您将日期和值都收集为列表,则可以使用和根据日期对结果列进行排序udf,然后仅保留结果中的值.

import operator
import pyspark.sql.functions as F

# create list column
grouped_df = input_df.groupby("id") \
               .agg(F.collect_list(F.struct("date", "value")) \
               .alias("list_col"))

# define udf
def sorter(l):
  res = sorted(l, key=operator.itemgetter(0))
  return [item[1] for item in res]

sort_udf = F.udf(sorter)

# test
grouped_df.select("id", sort_udf("list_col") \
  .alias("sorted_list")) \
  .show(truncate = False)
+---+----------------+
|id |sorted_list     |
+---+----------------+
|1  |[10, 5, 15, 20] |
|2  |[100, 500, 1500]|
+---+----------------+
Run Code Online (Sandbox Code Playgroud)

  • 分布式系统中的顺序通常是没有意义的,因此除非每个 id 的值都在一个分区中,否则无法保证正确的顺序。 (2认同)

小智 27

您可以使用 sort_array 函数。如果您将日期和值收集为一个列表,您可以使用 sorry_array 对结果列进行排序并仅保留您需要的列。

import operator
import pyspark.sql.functions as F

grouped_df = input_df.groupby("id") \
               .agg(F.sort_array(F.collect_list(F.struct("date", "value"))) \
.alias("collected_list")) \
.withColumn("sorted_list",col("collected_list.value")) \
.drop("collected_list")
.show(truncate=False)

+---+----------------+
|id |sorted_list     |
+---+----------------+
|1  |[10, 5, 15, 20] |
|2  |[100, 500, 1500]|
+---+----------------+ ```````
Run Code Online (Sandbox Code Playgroud)

  • 多谢。我找到 Window.patitionBy 然后获取最大行无法对大数据执行。您的解决方案速度快了约 200 倍。 (6认同)
  • 我不知道 Spark 将collected_list.value 这个概念理解为相应字段值的数组。好的! (2认同)

Art*_*yan 10

问题出在PySpark上,但也可能对Scala Spark也有帮助.

让我们准备测试数据帧:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.expressions.{ Window, UserDefinedFunction}

import java.sql.Date
import java.time.LocalDate

val spark: SparkSession = ...

// Out test data set
val data: Seq[(Int, Date, Int)] = Seq(
  (1, Date.valueOf(LocalDate.parse("2014-01-03")), 10),
  (1, Date.valueOf(LocalDate.parse("2014-01-04")), 5),
  (1, Date.valueOf(LocalDate.parse("2014-01-05")), 15),
  (1, Date.valueOf(LocalDate.parse("2014-01-06")), 20),
  (2, Date.valueOf(LocalDate.parse("2014-02-10")), 100),
  (2, Date.valueOf(LocalDate.parse("2014-02-11")), 500),
  (2, Date.valueOf(LocalDate.parse("2014-02-15")), 1500)
)

// Create dataframe
val df: DataFrame = spark.createDataFrame(data)
  .toDF("id", "date", "value")
df.show()
//+---+----------+-----+
//| id|      date|value|
//+---+----------+-----+
//|  1|2014-01-03|   10|
//|  1|2014-01-04|    5|
//|  1|2014-01-05|   15|
//|  1|2014-01-06|   20|
//|  2|2014-02-10|  100|
//|  2|2014-02-11|  500|
//|  2|2014-02-15| 1500|
//+---+----------+-----+
Run Code Online (Sandbox Code Playgroud)

使用UDF

// Group by id and aggregate date and value to new column date_value
val grouped = df.groupBy(col("id"))
  .agg(collect_list(struct("date", "value")) as "date_value")
grouped.show()
grouped.printSchema()
// +---+--------------------+
// | id|          date_value|
// +---+--------------------+
// |  1|[[2014-01-03,10],...|
// |  2|[[2014-02-10,100]...|
// +---+--------------------+

// udf to extract data from Row, sort by needed column (date) and return value
val sortUdf: UserDefinedFunction = udf((rows: Seq[Row]) => {
  rows.map { case Row(date: Date, value: Int) => (date, value) }
    .sortBy { case (date, value) => date }
    .map { case (date, value) => value }
})

// Select id and value_list
val r1 = grouped.select(col("id"), sortUdf(col("date_value")).alias("value_list"))
r1.show()
// +---+----------------+
// | id|      value_list|
// +---+----------------+
// |  1| [10, 5, 15, 20]|
// |  2|[100, 500, 1500]|
// +---+----------------+
Run Code Online (Sandbox Code Playgroud)

使用窗口

val window = Window.partitionBy(col("id")).orderBy(col("date"))
val sortedDf = df.withColumn("values_sorted_by_date", collect_list("value").over(window))
sortedDf.show()
//+---+----------+-----+---------------------+
//| id|      date|value|values_sorted_by_date|
//+---+----------+-----+---------------------+
//|  1|2014-01-03|   10|                 [10]|
//|  1|2014-01-04|    5|              [10, 5]|
//|  1|2014-01-05|   15|          [10, 5, 15]|
//|  1|2014-01-06|   20|      [10, 5, 15, 20]|
//|  2|2014-02-10|  100|                [100]|
//|  2|2014-02-11|  500|           [100, 500]|
//|  2|2014-02-15| 1500|     [100, 500, 1500]|
//+---+----------+-----+---------------------+

val r2 = sortedDf.groupBy(col("id"))
  .agg(max("values_sorted_by_date").as("value_list")) 
r2.show()
//+---+----------------+
//| id|      value_list|
//+---+----------------+
//|  1| [10, 5, 15, 20]|
//|  2|[100, 500, 1500]|
//+---+----------------+
Run Code Online (Sandbox Code Playgroud)


Sha*_*ego 5

为了确保对每个id进行排序,我们可以使用sortWithinPartitions:

from pyspark.sql import functions as F
ordered_df = (
    input_df
        .repartition(input_df.id)
        .sortWithinPartitions(['date'])


)
grouped_df = ordered_df.groupby("id").agg(F.collect_list("value"))
Run Code Online (Sandbox Code Playgroud)

  • 分组是在排序之后发生的。排序顺序会保留在分组中吗?没有这样的保证 AFAIK (5认同)

Far*_*din 5

在 Spark SQL 世界中,这个问题的答案是:

SELECT 
browser, max(list)
from (
  SELECT
    id,
    COLLECT_LIST(value) OVER (PARTITION BY id ORDER BY date DESC) as list
  FROM browser_count
  GROUP BYid, value, date) 
Group by browser;
Run Code Online (Sandbox Code Playgroud)