Pra*_*eja 4 python pandas apache-spark apache-spark-sql
我想将数据帧从pandas转换为spark,我正在使用spark_context.createDataFrame()方法来创建数据帧.我也在方法中指定架构createDataFrame().
我想知道的是如何处理特殊情况.例如,转换为Spark数据帧时,pandas中的NaN最终为字符串"NaN".我正在寻找如何获得实际的空值而不是"NaN"的方法.
TL; DR你现在最好的选择就是完全跳过Pandas.
问题的根源是Pandas的表达力不如Spark SQL.Spark提供两者NULL(在SQL意义上,作为缺失值)和NaN(数字不是数字).
来自另一个handm的Pandas没有可用于表示缺失值的原生值.其结果是,它使用像占位符NaN/ NaT或Inf,这是无法区分从实际到火花NaNs和Infs和转换规则取决于色谱柱的类型.唯一的例外是object可以包含None值的列(通常是字符串).您可以从文档中了解有关处理缺失值Pandas的更多信息.
例如,转换为Spark数据帧时,pandas中的NaN最终为字符串"NaN".
这实际上是不正确的.取决于输入列的类型.如果列显示NaN它很可能不是数字值,而不是普通字符串:
from pyspark.sql.functions import isnan, isnull
pdf = pd.DataFrame({
"x": [1, None], "y": [None, "foo"],
"z": [pd.Timestamp("20120101"), pd.Timestamp("NaT")]
})
sdf = spark.createDataFrame(pdf)
sdf.show()
Run Code Online (Sandbox Code Playgroud)
+---+----+-------------------+
| x| y| z|
+---+----+-------------------+
|1.0|null|2012-01-01 00:00:00|
|NaN| foo| null|
+---+----+-------------------+
Run Code Online (Sandbox Code Playgroud)
sdf.select([
f(c) for c in sdf.columns for f in [isnan, isnull]
if (f, c) != (isnan, "z") # isnan cannot be applied to timestamp
]).show()
Run Code Online (Sandbox Code Playgroud)
+--------+-----------+--------+-----------+-----------+
|isnan(x)|(x IS NULL)|isnan(y)|(y IS NULL)|(z IS NULL)|
+--------+-----------+--------+-----------+-----------+
| false| false| false| true| false|
| true| false| false| false| true|
+--------+-----------+--------+-----------+-----------+
Run Code Online (Sandbox Code Playgroud)
在实践中,并行化的本地集合(包括Pandas对象)除了简单的测试和玩具示例之外具有可忽略的重要性,因此您可以始终手动转换数据(跳过可能的箭头优化):
import numpy as np
spark.createDataFrame([
tuple(
None if isinstance(x, (float, int)) and np.isnan(x) else x
for x in record.tolist())
for record in pdf.to_records(index=False)
], pdf.columns.tolist()).show()
Run Code Online (Sandbox Code Playgroud)
+----+----+-------------------+
| x| y| z|
+----+----+-------------------+
| 1.0|null|1325376000000000000|
|null| foo| null|
+----+----+-------------------+
Run Code Online (Sandbox Code Playgroud)
如果缺少/非数字歧义不是问题,那么只需加载数据,并在Spark中替换.
from pyspark.sql.functions import col, when
sdf.select([
when(~isnan(c), col(c)).alias(c) if t in ("double", "float") else c
for c, t in sdf.dtypes
]).show()
Run Code Online (Sandbox Code Playgroud)
+----+----+-------------------+
| x| y| z|
+----+----+-------------------+
| 1.0|null|2012-01-01 00:00:00|
|null| foo| null|
+----+----+-------------------+
Run Code Online (Sandbox Code Playgroud)
如果你想加载一个 Pandas df,你可以用 None 替换 NaN:
import pandas as pd
def load_csv(spark, path):
"""read csv to spark df"""
pd_df = pd.read_csv(path)
pd_df = pd_df.where((pd.notnull(pd_df)), None)
df = spark.createDataFrame(pd_df)
return df
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2747 次 |
| 最近记录: |