Dat*_*ner 12 scala dataframe apache-spark apache-spark-sql imputation
我有一个带有一些缺失值的Spark Dataframe.我想通过用该列的平均值替换缺失值来执行简单的估算.我对Spark很新,所以我一直在努力实现这个逻辑.这是我到目前为止所做的事情:
a)要为单个列(比如Col A)执行此操作,这行代码似乎有效:
df.withColumn("new_Col", when($"ColA".isNull, df.select(mean("ColA"))
.first()(0).asInstanceOf[Double])
.otherwise($"ColA"))
Run Code Online (Sandbox Code Playgroud)
b)但是,我无法弄清楚,如何对我的数据帧中的所有列执行此操作.我正在尝试Map函数,但我相信它遍历数据帧的每一行
c)SO上有类似的问题 - 这里.虽然我喜欢这个解决方案(使用聚合表和合并),但我非常想知道是否有办法通过遍历每一列来实现这一点(我来自R,所以使用更高阶函数循环遍历每一列lapply对我来说似乎更自然).
谢谢!
use*_*411 16
Spark> = 2.2
您可以使用org.apache.spark.ml.feature.Imputer(支持均值和中值策略).
斯卡拉:
import org.apache.spark.ml.feature.Imputer
val imputer = new Imputer()
.setInputCols(df.columns)
.setOutputCols(df.columns.map(c => s"${c}_imputed"))
.setStrategy("mean")
imputer.fit(df).transform(df)
Run Code Online (Sandbox Code Playgroud)
Python:
from pyspark.ml.feature import Imputer
imputer = Imputer(
inputCols=df.columns,
outputCols=["{}_imputed".format(c) for c in df.columns]
)
imputer.fit(df).transform(df)
Run Code Online (Sandbox Code Playgroud)
Spark <2.2
这个给你:
import org.apache.spark.sql.functions.mean
df.na.fill(df.columns.zip(
df.select(df.columns.map(mean(_)): _*).first.toSeq
).toMap)
Run Code Online (Sandbox Code Playgroud)
哪里
df.columns.map(mean(_)): Array[Column]
Run Code Online (Sandbox Code Playgroud)
计算每列的平均值,
df.select(_: *).first.toSeq: Seq[Any]
Run Code Online (Sandbox Code Playgroud)
收集聚合值并将行转换为Seq[Any](我知道它不是最理想的,但这是我们必须使用的API),
df.columns.zip(_).toMap: Map[String,Any]
Run Code Online (Sandbox Code Playgroud)
创建aMap: Map[String, Any]从列名称到其平均值的映射,最后:
df.na.fill(_): DataFrame
Run Code Online (Sandbox Code Playgroud)
使用以下方法填充缺失的值:
fill: Map[String, Any] => DataFrame
Run Code Online (Sandbox Code Playgroud)
来自DataFrameNaFunctions.
对于ingore NaN条目,您可以替换:
df.select(df.columns.map(mean(_)): _*).first.toSeq
Run Code Online (Sandbox Code Playgroud)
有:
import org.apache.spark.sql.functions.{col, isnan, when}
df.select(df.columns.map(
c => mean(when(!isnan(col(c)), col(c)))
): _*).first.toSeq
Run Code Online (Sandbox Code Playgroud)
用于在 PySpark < 2.2 中估算中位数(而不是平均值)
## filter numeric cols
num_cols = [col_type[0] for col_type in filter(lambda dtype: dtype[1] in {"bigint", "double", "int"}, df.dtypes)]
### Compute a dict with <col_name, median_value>
median_dict = dict()
for c in num_cols:
median_dict[c] = df.stat.approxQuantile(c, [0.5], 0.001)[0]
Run Code Online (Sandbox Code Playgroud)
然后,申请na.fill
df_imputed = df.na.fill(median_dict)
Run Code Online (Sandbox Code Playgroud)