Man*_*hra 13 unpivot apache-spark apache-spark-sql pyspark
我手头有一个问题声明,我想在spark-sql/pyspark中取消对表的删除.我已经阅读了文档,我可以看到只支持pivot,但到目前为止还没有支持un-pivot.有没有办法实现这个目标?
让我的初始表看起来像这样:
当我使用下面提到的命令在pyspark中进行调整时:
df.groupBy("A").pivot("B").sum("C")
Run Code Online (Sandbox Code Playgroud)
我把它作为输出:
现在我想取消转动的表格.通常,此操作可能会/可能不会根据我转动原始表的方式产生原始表.
到目前为止,Spark-sql并不提供对unpivot的开箱即用支持.有没有办法实现这个目标?
And*_*Ray 29
您可以使用内置堆栈功能,例如在Scala中:
scala> val df = Seq(("G",Some(4),2,None),("H",None,4,Some(5))).toDF("A","X","Y", "Z")
df: org.apache.spark.sql.DataFrame = [A: string, X: int ... 2 more fields]
scala> df.show
+---+----+---+----+
| A| X| Y| Z|
+---+----+---+----+
| G| 4| 2|null|
| H|null| 4| 5|
+---+----+---+----+
scala> df.select($"A", expr("stack(3, 'X', X, 'Y', Y, 'Z', Z) as (B, C)")).where("C is not null").show
+---+---+---+
| A| B| C|
+---+---+---+
| G| X| 4|
| G| Y| 2|
| H| Y| 4|
| H| Z| 5|
+---+---+---+
Run Code Online (Sandbox Code Playgroud)
或者在pyspark:
In [1]: df = spark.createDataFrame([("G",4,2,None),("H",None,4,5)],list("AXYZ"))
In [2]: df.show()
+---+----+---+----+
| A| X| Y| Z|
+---+----+---+----+
| G| 4| 2|null|
| H|null| 4| 5|
+---+----+---+----+
In [3]: df.selectExpr("A", "stack(3, 'X', X, 'Y', Y, 'Z', Z) as (B, C)").where("C is not null").show()
+---+---+---+
| A| B| C|
+---+---+---+
| G| X| 4|
| G| Y| 2|
| H| Y| 4|
| H| Z| 5|
+---+---+---+
Run Code Online (Sandbox Code Playgroud)
Spark 3.4+
df = df.melt(['A'], ['X', 'Y', 'Z'], 'B', 'C')
# OR
df = df.unpivot(['A'], ['X', 'Y', 'Z'], 'B', 'C')
Run Code Online (Sandbox Code Playgroud)
+---+---+----+
| A| B| C|
+---+---+----+
| G| Y| 2|
| G| Z|null|
| G| X| 4|
| H| Y| 4|
| H| Z| 5|
| H| X|null|
+---+---+----+
Run Code Online (Sandbox Code Playgroud)
To filter out nulls: df = df.filter("C is not null")
Spark 3.3 and below
to_melt = {'X', 'Y', 'Z'}
new_names = ['B', 'C']
melt_str = ','.join([f"'{c}', `{c}`" for c in to_melt])
df = df.select(
*(set(df.columns) - to_melt),
F.expr(f"stack({len(to_melt)}, {melt_str}) ({','.join(new_names)})")
).filter(f"!{new_names[1]} is null")
Run Code Online (Sandbox Code Playgroud)
Full test:
from pyspark.sql import functions as F
df = spark.createDataFrame([("G", 4, 2, None), ("H", None, 4, 5)], list("AXYZ"))
to_melt = {'X', 'Y', 'Z'}
new_names = ['B', 'C']
melt_str = ','.join([f"'{c}', `{c}`" for c in to_melt])
df = df.select(
*(set(df.columns) - to_melt),
F.expr(f"stack({len(to_melt)}, {melt_str}) ({','.join(new_names)})")
).filter(f"!{new_names[1]} is null")
df.show()
# +---+---+---+
# | A| B| C|
# +---+---+---+
# | G| Y| 2|
# | G| X| 4|
# | H| Y| 4|
# | H| Z| 5|
# +---+---+---+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
12791 次 |
| 最近记录: |