Adr*_*rie 6 pandas pyspark spark-dataframe pyspark-sql
你怎么能做与df.fillna(method='bfill')带有 的熊猫数据框相同的事情pyspark.sql.DataFrame?
pyspark 数据框具有该pyspark.sql.DataFrame.fillna方法,但不支持method参数。
在 Pandas 中,您可以使用以下内容回填时间序列:
创建数据
import pandas as pd
index = pd.date_range('2017-01-01', '2017-01-05')
data = [1, 2, 3, None, 5]
df = pd.DataFrame({'data': data}, index=index)
Run Code Online (Sandbox Code Playgroud)
给予
Out[1]:
data
2017-01-01 1.0
2017-01-02 2.0
2017-01-03 3.0
2017-01-04 NaN
2017-01-05 5.0
Run Code Online (Sandbox Code Playgroud)
回填数据框
df = df.fillna(method='bfill')
Run Code Online (Sandbox Code Playgroud)
生成回填框架
Out[2]:
data
2017-01-01 1.0
2017-01-02 2.0
2017-01-03 3.0
2017-01-04 5.0
2017-01-05 5.0
Run Code Online (Sandbox Code Playgroud)
如何为 a 做同样的事情pyspark.sql.DataFrame?
和last函数first及其ignorenulls=True标志可以与窗口结合起来rowsBetween。如果我们想向后填充,我们选择当前行和末尾之间的第一个非空值。如果我们想向前填充,我们选择起始行和当前行之间的最后一个非空值。
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W
import sys
df.withColumn(
'data',
F.first(
F.col('data'),
ignorenulls=True
) \
.over(
W.orderBy('date').rowsBetween(0, sys.maxsize)
)
)
Run Code Online (Sandbox Code Playgroud)
填充 Spark 的来源:https://towardsdatascience.com/end-to-end-time-series-interpolation-in-pyspark-filling-the-gap-5ccefc6b7fc9
实际上,分布式数据集的回填并不像 pandas(本地)数据帧那样简单 - 您无法确定要填充的值存在于同一分区中。我将结合使用 crossJoin 和窗口,例如 DF:
df = spark.createDataFrame([
('2017-01-01', None),
('2017-01-02', 'B'),
('2017-01-03', None),
('2017-01-04', None),
('2017-01-05', 'E'),
('2017-01-06', None),
('2017-01-07', 'G')], ['date', 'value'])
df.show()
+----------+-----+
| date|value|
+----------+-----+
|2017-01-01| null|
|2017-01-02| B|
|2017-01-03| null|
|2017-01-04| null|
|2017-01-05| E|
|2017-01-06| null|
|2017-01-07| G|
+----------+-----+
Run Code Online (Sandbox Code Playgroud)
代码是:
from pyspark.sql.window import Window
df.alias('a').crossJoin(df.alias('b')) \
.where((col('b.date') >= col('a.date')) & (col('a.value').isNotNull() | col('b.value').isNotNull())) \
.withColumn('rn', row_number().over(Window.partitionBy('a.date').orderBy('b.date'))) \
.where(col('rn') == 1) \
.select('a.date', coalesce('a.value', 'b.value').alias('value')) \
.orderBy('a.date') \
.show()
+----------+-----+
| date|value|
+----------+-----+
|2017-01-01| B|
|2017-01-02| B|
|2017-01-03| E|
|2017-01-04| E|
|2017-01-05| E|
|2017-01-06| G|
|2017-01-07| G|
+----------+-----+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4271 次 |
| 最近记录: |