Nho*_*hor 28 sql window-functions apache-spark apache-spark-sql pyspark
我有一个DataFrame
带有数据的Spark SQL ,我想要得到的是给定日期范围内当前行之前的所有行.因此,例如,我希望将7天之前的所有行放在给定行之前.我想我需要使用Window Function
像:
Window \
.partitionBy('id') \
.orderBy('start')
Run Code Online (Sandbox Code Playgroud)
这就是问题所在.我想要有rangeBetween
7天的时间,但是我在这个文件中找不到任何内容.Spark甚至提供这样的选择吗?现在我只是得到前面的所有行:
.rowsBetween(-sys.maxsize, 0)
Run Code Online (Sandbox Code Playgroud)
但想要实现以下目标:
.rangeBetween("7 days", 0)
Run Code Online (Sandbox Code Playgroud)
如果有人能帮助我,我将非常感激.提前致谢!
zer*_*323 54
Spark> = 2.3
从Spark 2.3开始,可以使用SQL API来使用区间对象,但DataFrame
API支持仍在进行中.
df.createOrReplaceTempView("df")
spark.sql(
"""SELECT *, mean(some_value) OVER (
PARTITION BY id
ORDER BY CAST(start AS timestamp)
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) AS mean FROM df""").show()
## +---+----------+----------+------------------+
## | id| start|some_value| mean|
## +---+----------+----------+------------------+
## | 1|2015-01-01| 20.0| 20.0|
## | 1|2015-01-06| 10.0| 15.0|
## | 1|2015-01-07| 25.0|18.333333333333332|
## | 1|2015-01-12| 30.0|21.666666666666668|
## | 2|2015-01-01| 5.0| 5.0|
## | 2|2015-01-03| 30.0| 17.5|
## | 2|2015-02-01| 20.0| 20.0|
## +---+----------+----------+------------------+
Run Code Online (Sandbox Code Playgroud)
Spark <2.3
据我所知,Spark和Hive都不可能直接使用.两个require ORDER BY
子句RANGE
都是数字.我发现最接近的是转换为时间戳并在几秒钟内运行.假设start
列包含date
类型:
from pyspark.sql import Row
row = Row("id", "start", "some_value")
df = sc.parallelize([
row(1, "2015-01-01", 20.0),
row(1, "2015-01-06", 10.0),
row(1, "2015-01-07", 25.0),
row(1, "2015-01-12", 30.0),
row(2, "2015-01-01", 5.0),
row(2, "2015-01-03", 30.0),
row(2, "2015-02-01", 20.0)
]).toDF().withColumn("start", col("start").cast("date"))
Run Code Online (Sandbox Code Playgroud)
一个小帮手和窗口定义:
from pyspark.sql.window import Window
from pyspark.sql.functions import mean, col
# Hive timestamp is interpreted as UNIX timestamp in seconds*
days = lambda i: i * 86400
Run Code Online (Sandbox Code Playgroud)
最后查询:
w = (Window()
.partitionBy(col("id"))
.orderBy(col("start").cast("timestamp").cast("long"))
.rangeBetween(-days(7), 0))
df.select(col("*"), mean("some_value").over(w).alias("mean")).show()
## +---+----------+----------+------------------+
## | id| start|some_value| mean|
## +---+----------+----------+------------------+
## | 1|2015-01-01| 20.0| 20.0|
## | 1|2015-01-06| 10.0| 15.0|
## | 1|2015-01-07| 25.0|18.333333333333332|
## | 1|2015-01-12| 30.0|21.666666666666668|
## | 2|2015-01-01| 5.0| 5.0|
## | 2|2015-01-03| 30.0| 17.5|
## | 2|2015-02-01| 20.0| 20.0|
## +---+----------+----------+------------------+
Run Code Online (Sandbox Code Playgroud)
远非漂亮,但有效.
Spark 3.3发布了,但是......
答案可能与 Spark 1.5.0 一样古老:
datediff
。
datediff(col_name, '1000')
将返回从 1000-01-01 到 col_name 的整数天差。
作为第一个参数,它接受日期、时间戳甚至字符串。
作为第二个,它甚至接受1000
.
日期差异(以天为单位) -取决于订单列的数据类型:
日期
火花3.1+
.orderBy(F.expr("unix_date(col_name)")).rangeBetween(-7, 0)
Run Code Online (Sandbox Code Playgroud)
火花2.1+
.orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
Run Code Online (Sandbox Code Playgroud)
时间戳
火花2.1+
.orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
Run Code Online (Sandbox Code Playgroud)
long - 以微秒为单位的 UNIX 时间(例如 1672534861000000)
火花2.1+
.orderBy(F.col("col_name") / 86400_000000).rangeBetween(-7, 0)
Run Code Online (Sandbox Code Playgroud)
long - UNIX 时间(以毫秒为单位)(例如 1672534861000)
火花2.1+
.orderBy(F.col("col_name") / 86400_000).rangeBetween(-7, 0)
Run Code Online (Sandbox Code Playgroud)
long - UNIX 时间(以秒为单位)(例如 1672534861)
火花2.1+
.orderBy(F.col("col_name") / 86400).rangeBetween(-7, 0)
Run Code Online (Sandbox Code Playgroud)
长格式 yyyyMMdd
火花3.3+
.orderBy(F.expr("unix_date(to_date(col_name, 'yyyyMMdd'))")).rangeBetween(-7, 0)
Run Code Online (Sandbox Code Playgroud)
火花3.1+
.orderBy(F.expr("unix_date(to_date(cast(col_name as string), 'yyyyMMdd'))")).rangeBetween(-7, 0)
Run Code Online (Sandbox Code Playgroud)
火花2.2+
.orderBy(F.expr("datediff(to_date(cast(col_name as string), 'yyyyMMdd'), '1000')")).rangeBetween(-7, 0)
Run Code Online (Sandbox Code Playgroud)
火花2.1+
.orderBy(F.unix_timestamp(F.col("col_name").cast('string'), 'yyyyMMdd') / 86400).rangeBetween(-7, 0)
Run Code Online (Sandbox Code Playgroud)
日期格式为 'yyyy-MM-dd' 的字符串
火花3.1+
.orderBy(F.expr("unix_date(to_date(col_name))")).rangeBetween(-7, 0)
Run Code Online (Sandbox Code Playgroud)
火花2.1+
.orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
Run Code Online (Sandbox Code Playgroud)
其他日期格式的字符串(例如“MM-dd-yyyy”)
火花3.1+
.orderBy(F.expr("unix_date(to_date(col_name, 'MM-dd-yyyy'))")).rangeBetween(-7, 0)
Run Code Online (Sandbox Code Playgroud)
火花2.2+
.orderBy(F.expr("datediff(to_date(col_name, 'MM-dd-yyyy'), '1000')")).rangeBetween(-7, 0)
Run Code Online (Sandbox Code Playgroud)
火花2.1+
.orderBy(F.unix_timestamp("col_name", 'MM-dd-yyyy') / 86400).rangeBetween(-7, 0)
Run Code Online (Sandbox Code Playgroud)
时间戳格式为 'yyyy-MM-dd HH:mm:ss' 的字符串
火花2.1+
.orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
Run Code Online (Sandbox Code Playgroud)
其他时间戳格式的字符串(例如“MM-dd-yyyy HH:mm:ss”)
火花2.2+
.orderBy(F.expr("datediff(to_date(col_name, 'MM-dd-yyyy HH:mm:ss'), '1000')")).rangeBetween(-7, 0)
Run Code Online (Sandbox Code Playgroud)
小智 5
很棒的解决方案@zero323,如果您想在几分钟内而不是像我那样需要几天的时间内进行操作,并且不需要使用id进行分区,因此您只需修改代码的一部分,如我所示:
df.createOrReplaceTempView("df")
spark.sql(
"""SELECT *, sum(total) OVER (
ORDER BY CAST(reading_date AS timestamp)
RANGE BETWEEN INTERVAL 45 minutes PRECEDING AND CURRENT ROW
) AS sum_total FROM df""").show()
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
21804 次 |
最近记录: |