Ven*_*thi 35 melt apache-spark apache-spark-sql pyspark
PySpark中的Apache Spark中是否存在等效的Pandas Melt函数,或者至少在Scala中?
我到目前为止在python中运行了一个示例数据集,现在我想将Spark用于整个数据集.
提前致谢.
use*_*411 60
没有内置函数(如果你使用SQL和Hive支持,你可以使用stack函数,但它没有在Spark中公开,也没有本机实现),但滚动你自己的功能是微不足道的.所需进口:
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable
Run Code Online (Sandbox Code Playgroud)
示例实现:
def melt(
df: DataFrame,
id_vars: Iterable[str], value_vars: Iterable[str],
var_name: str="variable", value_name: str="value") -> DataFrame:
"""Convert :class:`DataFrame` from wide to long format."""
# Create array<struct<variable: str, value: ...>>
_vars_and_vals = array(*(
struct(lit(c).alias(var_name), col(c).alias(value_name))
for c in value_vars))
# Add to the DataFrame and explode
_tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
cols = id_vars + [
col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
return _tmp.select(*cols)
Run Code Online (Sandbox Code Playgroud)
还有一些测试(基于Pandas doctests):
import pandas as pd
pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
'B': {0: 1, 1: 3, 2: 5},
'C': {0: 2, 1: 4, 2: 6}})
pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C'])
Run Code Online (Sandbox Code Playgroud)
A variable value
0 a B 1
1 b B 3
2 c B 5
3 a C 2
4 b C 4
5 c C 6
Run Code Online (Sandbox Code Playgroud)
sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C']).show()
Run Code Online (Sandbox Code Playgroud)
+---+--------+-----+
| A|variable|value|
+---+--------+-----+
| a| B| 1|
| a| C| 2|
| b| B| 3|
| b| C| 4|
| c| B| 5|
| c| C| 6|
+---+--------+-----+
Run Code Online (Sandbox Code Playgroud)
注意:要与旧版Python一起使用,请删除类型注释.
有关:
Ahu*_*hue 21
在我搜索meltSpark for Scala 的实现时遇到了这个问题.
发布我的Scala端口,万一有人也偶然发现了这个问题.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame}
/** Extends the [[org.apache.spark.sql.DataFrame]] class
*
* @param df the data frame to melt
*/
implicit class DataFrameFunctions(df: DataFrame) {
/** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format.
*
* melt is (kind of) the inverse of pivot
* melt is currently (02/2017) not implemented in spark
*
* @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html)
* @see this is a scala adaptation of http://stackoverflow.com/questions/41670103/pandas-melt-function-in-apache-spark
*
* @todo method overloading for simple calling
*
* @param id_vars the columns to preserve
* @param value_vars the columns to melt
* @param var_name the name for the column holding the melted columns names
* @param value_name the name for the column holding the values of the melted columns
*
*/
def melt(
id_vars: Seq[String], value_vars: Seq[String],
var_name: String = "variable", value_name: String = "value") : DataFrame = {
// Create array<struct<variable: str, value: ...>>
val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*)
// Add to the DataFrame and explode
val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }}
return _tmp.select(cols: _*)
}
}
Run Code Online (Sandbox Code Playgroud)
由于我不是那么先进考虑Scala,我相信还有改进的余地.
欢迎任何评论.
小智 8
为用户6910411的答案投票。它按预期工作,但是它不能很好地处理 None 值。因此我将他的熔化函数重构为以下内容:
from pyspark.sql.functions import array, col, explode, lit
from pyspark.sql.functions import create_map
from pyspark.sql import DataFrame
from typing import Iterable
from itertools import chain
def melt(
df: DataFrame,
id_vars: Iterable[str], value_vars: Iterable[str],
var_name: str="variable", value_name: str="value") -> DataFrame:
"""Convert :class:`DataFrame` from wide to long format."""
# Create map<key: value>
_vars_and_vals = create_map(
list(chain.from_iterable([
[lit(c), col(c)] for c in value_vars]
))
)
_tmp = df.select(*id_vars, explode(_vars_and_vals)) \
.withColumnRenamed('key', var_name) \
.withColumnRenamed('value', value_name)
return _tmp
Run Code Online (Sandbox Code Playgroud)
使用以下数据框进行测试:
import pandas as pd
pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
'B': {0: 1, 1: 3, 2: 5},
'C': {0: 2, 1: 4, 2: 6},
'D': {1: 7, 2: 9}})
pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C', 'D'])
A variable value
0 a B 1.0
1 b B 3.0
2 c B 5.0
3 a C 2.0
4 b C 4.0
5 c C 6.0
6 a D NaN
7 b D 7.0
8 c D 9.0
Run Code Online (Sandbox Code Playgroud)
sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C', 'D']).show()
+---+--------+-----+
| A|variable|value|
+---+--------+-----+
| a| B| 1.0|
| a| C| 2.0|
| a| D| NaN|
| b| B| 3.0|
| b| C| 4.0|
| b| D| 7.0|
| c| B| 5.0|
| c| C| 6.0|
| c| D| 9.0|
+---+--------+-----+
Run Code Online (Sandbox Code Playgroud)