有没有办法将限制参数传递给Spark中的functions.collect_set?

use*_*142 14 aggregate-functions dataframe apache-spark apache-spark-sql

我正在处理一个大型Spark DataFrame中的一列数字,我想创建一个新列,它存储该列中出现的唯一数字的聚合列表.

基本上就是functions.collect_set的作用.但是,我只需要聚合列表中最多1000个元素.有没有办法以某种方式将该参数传递给functions.collect_set(),或者在不使用UDAF的情况下以任何其他方式在聚合列表中仅获取最多1000个元素?

由于列很大,我想避免收集所有元素并在之后修剪列表.

谢谢!

Jac*_*ski 5

我的解决方案与Loki的答案collect_set_limit非常相似.


我会使用一个UDF,它可以在你想要的之后collect_set(或者collect_list)或者更难的UDAF.

鉴于UDF的更多经验,我首先要考虑它.即使UDF没有经过优化,对于这个用例也没关系.

val limitUDF = udf { (nums: Seq[Long], limit: Int) => nums.take(limit) }
val sample = spark.range(50).withColumn("key", $"id" % 5)

scala> sample.groupBy("key").agg(collect_set("id") as "all").show(false)
+---+--------------------------------------+
|key|all                                   |
+---+--------------------------------------+
|0  |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]|
|1  |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]|
|3  |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]|
|2  |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]|
|4  |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]|
+---+--------------------------------------+

scala> sample.
  groupBy("key").
  agg(collect_set("id") as "all").
  withColumn("limit(3)", limitUDF($"all", lit(3))).
  show(false)
+---+--------------------------------------+------------+
|key|all                                   |limit(3)    |
+---+--------------------------------------+------------+
|0  |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]|[0, 15, 30] |
|1  |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]|[1, 16, 31] |
|3  |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]|[33, 48, 13]|
|2  |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]|[12, 27, 37]|
|4  |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]|[9, 19, 34] |
+---+--------------------------------------+------------+
Run Code Online (Sandbox Code Playgroud)

请参阅函数对象(对于udf函数的文档).

  • 看起来现在在较新版本的Spark中有一个`slice()`函数可以做这种事情. (3认同)

小智 1

使用采取

val firstThousand = rdd.take(1000)
Run Code Online (Sandbox Code Playgroud)

将返回前 1000 个。 Collect 还可以提供过滤功能。这将使您能够更具体地了解返回的内容。