lil*_*ffa 3 sql aggregate-functions apache-spark apache-spark-sql pyspark
我希望你能帮助我.我有一个DF如下:
val df = sc.parallelize(Seq(
(1, "a", "2014-12-01", "2015-01-01", 100),
(2, "a", "2014-12-01", "2015-01-02", 150),
(3, "a", "2014-12-01", "2015-01-03", 120),
(4, "b", "2015-12-15", "2015-01-01", 100)
)).toDF("id", "prodId", "dateIns", "dateTrans", "value")
.withColumn("dateIns", to_date($"dateIns")
.withColumn("dateTrans", to_date($"dateTrans"))
Run Code Online (Sandbox Code Playgroud)
我很乐意做一个groupBy prodId并汇总'value',将日期范围总结为'dateIns'和'dateTrans'列之间的差异.特别是,我想有一种方法来定义一个条件和,它总结了上述列之间预定义的最大差异内的所有值.即从dateIns('dateTrans' - 'dateIns'<= 10,20,30)10天,20天,30天之间发生的所有值.
在spark中是否有任何预定义的聚合函数允许进行条件求和?你建议开发一个aggr.UDF(如果是这样,任何建议)?我正在使用pySpqrk,但也很高兴获得Scala解决方案.非常感谢!
让我们更有趣一点,所以窗口中有一些事件:
val df = sc.parallelize(Seq(
(1, "a", "2014-12-30", "2015-01-01", 100),
(2, "a", "2014-12-21", "2015-01-02", 150),
(3, "a", "2014-12-10", "2015-01-03", 120),
(4, "b", "2014-12-05", "2015-01-01", 100)
)).toDF("id", "prodId", "dateIns", "dateTrans", "value")
.withColumn("dateIns", to_date($"dateIns"))
.withColumn("dateTrans", to_date($"dateTrans"))
Run Code Online (Sandbox Code Playgroud)
你需要的或多或少是这样的:
import org.apache.spark.sql.functions.{col, datediff, lit, sum}
// Find difference in tens of days
val diff = (datediff(col("dateTrans"), col("dateIns")) / 10)
.cast("integer") * 10
val dfWithDiff = df.withColumn("diff", diff)
val aggregated = dfWithDiff
.where((col("diff") < 30) && (col("diff") >= 0))
.groupBy(col("prodId"), col("diff"))
.agg(sum(col("value")))
Run Code Online (Sandbox Code Playgroud)
结果
aggregated.show
// +------+----+----------+
// |prodId|diff|sum(value)|
// +------+----+----------+
// | a| 20| 120|
// | b| 20| 100|
// | a| 0| 100|
// | a| 10| 150|
// +------+----+----------+
Run Code Online (Sandbox Code Playgroud)
其中diff
是范围的下限(0 - > [0,10),10 - > [10,20],...).如果您删除val
并调整导入,这也适用于PySpark .
编辑(每列聚合):
val exprs = Seq(0, 10, 20).map(x => sum(
when(col("diff") === lit(x), col("value"))
.otherwise(lit(0)))
.alias(x.toString))
dfWithDiff.groupBy(col("prodId")).agg(exprs.head, exprs.tail: _*).show
// +------+---+---+---+
// |prodId| 0| 10| 20|
// +------+---+---+---+
// | a|100|150|120|
// | b| 0| 0|100|
// +------+---+---+---+
Run Code Online (Sandbox Code Playgroud)
与Python等价:
from pyspark.sql.functions import *
def make_col(x):
cnd = when(col("diff") == lit(x), col("value")).otherwise(lit(0))
return sum(cnd).alias(str(x))
exprs = [make_col(x) for x in range(0, 30, 10)]
dfWithDiff.groupBy(col("prodId")).agg(*exprs).show()
## +------+---+---+---+
## |prodId| 0| 10| 20|
## +------+---+---+---+
## | a|100|150|120|
## | b| 0| 0|100|
## +------+---+---+---+
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
7534 次 |
最近记录: |