Sir*_*uhn 5 scala apache-spark apache-spark-sql
我正在尝试计算order_id
过去 365 天内有多少订单付款。这不是问题:我使用window function。
凡我得到棘手的是:我不想计算的订单这个时间窗口,其中payment_date
为后order_date
的电流order_id
。
目前,我有这样的事情:
val window: WindowSpec = Window
.partitionBy("customer_id")
.orderBy("order_date")
.rangeBetween(-365*days, -1)
Run Code Online (Sandbox Code Playgroud)
和
df.withColumn("paid_order_count", count("*") over window)
Run Code Online (Sandbox Code Playgroud)
这将计算客户在当前订单之前的最后 365 天内的所有订单。
我现在如何合并考虑order_date
当前订单的计数条件 ?
例子:
+---------+-----------+-------------+------------+
|order_id |order_date |payment_date |customer_id |
+---------+-----------+-------------+------------+
|1 |2017-01-01 |2017-01-10 |A |
|2 |2017-02-01 |2017-02-10 |A |
|3 |2017-02-02 |2017-02-20 |A |
Run Code Online (Sandbox Code Playgroud)
结果表应如下所示:
+---------+-----------+-------------+------------+-----------------+
|order_id |order_date |payment_date |customer_id |paid_order_count |
+---------+-----------+-------------+------------+-----------------+
|1 |2017-01-01 |2017-01-10 |A |0 |
|2 |2017-02-01 |2017-02-10 |A |1 |
|3 |2017-02-02 |2017-02-20 |A |1 |
Run Code Online (Sandbox Code Playgroud)
对于order_id = 3
该paid_order_count
不该2
,但1
作为order_id = 2
后付费order_id = 3
放置。
我希望我能很好地解释我的问题,并期待您的想法!
很好的问题!!!需要注意的是,使用rangeBetween创建一个固定框架,该框架基于其中的行数而不是值,因此在两种情况下会出现问题:
rangeBetween也不适用于日期和时间戳数据类型。
为了解决这个问题,可以使用带有列表和 UDF 的窗口函数:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df = spark.sparkContext.parallelize(Seq(
(1, "2017-01-01", "2017-01-10", "A")
, (2, "2017-02-01", "2017-02-10", "A")
, (3, "2017-02-02", "2017-02-20", "A")
)
).toDF("order_id", "order_date", "payment_date", "customer_id")
.withColumn("order_date_ts", to_timestamp($"order_date", "yyyy-MM-dd").cast("long"))
.withColumn("payment_date_ts", to_timestamp($"payment_date", "yyyy-MM-dd").cast("long"))
// df.printSchema()
// df.show(false)
val window = Window.partitionBy("customer_id").orderBy("order_date_ts").rangeBetween(Window.unboundedPreceding, -1)
val count_filtered_dates = udf( (days: Int, top: Long, array: Seq[Long]) => {
val bottom = top - (days * 60 * 60 * 24).toLong // in spark timestamps are in secconds, calculating the date days ago
array.count(v => v >= bottom && v < top)
}
)
val res = df.withColumn("paid_orders", collect_list("payment_date_ts") over window)
.withColumn("paid_order_count", count_filtered_dates(lit(365), $"order_date_ts", $"paid_orders"))
res.show(false)
Run Code Online (Sandbox Code Playgroud)
输出:
+--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+
|order_id|order_date|payment_date|customer_id|order_date_ts|payment_date_ts|paid_orders |paid_order_count|
+--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+
|1 |2017-01-01|2017-01-10 |A |1483228800 |1484006400 |[] |0 |
|2 |2017-02-01|2017-02-10 |A |1485907200 |1486684800 |[1484006400] |1 |
|3 |2017-02-02|2017-02-20 |A |1485993600 |1487548800 |[1484006400, 1486684800]|1 |
+--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+
Run Code Online (Sandbox Code Playgroud)
将日期转换为 Spark 时间戳(以秒为单位)可以提高列表的内存效率。
这是最容易实现的代码,但不是最佳的,因为列表会占用一些内存,自定义 UDAF 是最好的,但需要更多编码,可能稍后会做。如果每个客户有数千个订单,这仍然有效。
归档时间: |
|
查看次数: |
2917 次 |
最近记录: |