fun*_*iki 5 python aggregate-functions apache-spark pyspark pyspark-sql
我希望能够按步长设置 Spark 组,而不仅仅是单个值。spark 中是否有类似于 PySpark 2.x 的window数字(非日期)值函数?
类似的东西:
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame([10, 11, 12, 13], "integer").toDF("foo")
res = df.groupBy(window("foo", step=2, start=10)).count()
Run Code Online (Sandbox Code Playgroud)
您可以重用时间戳一并以秒为单位表达参数。翻滚:
from pyspark.sql.functions import col, window
df.withColumn(
"window",
window(
col("foo").cast("timestamp"),
windowDuration="2 seconds"
).cast("struct<start:bigint,end:bigint>")
).show()
# +---+-------+
# |foo| window|
# +---+-------+
# | 10|[10,12]|
# | 11|[10,12]|
# | 12|[12,14]|
# | 13|[12,14]|
# +---+-------+
Run Code Online (Sandbox Code Playgroud)
滚动一:
df.withColumn(
"window",
window(
col("foo").cast("timestamp"),
windowDuration="2 seconds", slideDuration="1 seconds"
).cast("struct<start:bigint,end:bigint>")
).show()
# +---+-------+
# |foo| window|
# +---+-------+
# | 10| [9,11]|
# | 10|[10,12]|
# | 11|[10,12]|
# | 11|[11,13]|
# | 12|[11,13]|
# | 12|[12,14]|
# | 13|[12,14]|
# | 13|[13,15]|
# +---+-------+
Run Code Online (Sandbox Code Playgroud)
使用groupBy和start:
w = window(col("foo").cast("timestamp"), "2 seconds").cast("struct<start:bigint,end:bigint>")
start = w.start.alias("start")
df.groupBy(start).count().show()
+-----+-----+
|start|count|
+-----+-----+
| 10| 2|
| 12| 2|
+-----+-----+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7033 次 |
| 最近记录: |