Pandas数据帧到Spark数据帧,处理NaN转换为实际null?

Pra*_*eja 4 python pandas apache-spark apache-spark-sql

我想将数据帧从pandas转换为spark,我正在使用spark_context.createDataFrame()方法来创建数据帧.我也在方法中指定架构createDataFrame().

我想知道的是如何处理特殊情况.例如,转换为Spark数据帧时,pandas中的NaN最终为字符串"NaN".我正在寻找如何获得实际的空值而不是"NaN"的方法.

use*_*411 8

TL; DR你现在最好的选择就是完全跳过Pandas.

问题的根源是Pandas的表达力不如Spark SQL.Spark提供两者NULL(在SQL意义上,作为缺失值)和NaN(数字不是数字).

来自另一个handm的Pandas没有可用于表示缺失值的原生值.其结果是,它使用像占位符NaN/ NaTInf,这是无法区分从实际到火花NaNsInfs和转换规则取决于色谱柱的类型.唯一的例外是object可以包含None值的列(通常是字符串).您可以从文档中了解有关处理缺失值Pandas的更多信息.

例如,转换为Spark数据帧时,pandas中的NaN最终为字符串"NaN".

这实际上是不正确的.取决于输入列的类型.如果列显示NaN它很可能不是数字值,而不是普通字符串:

from pyspark.sql.functions import isnan, isnull

pdf = pd.DataFrame({
    "x": [1, None], "y": [None, "foo"], 
    "z": [pd.Timestamp("20120101"), pd.Timestamp("NaT")]
})
sdf = spark.createDataFrame(pdf)

sdf.show()
Run Code Online (Sandbox Code Playgroud)
+---+----+-------------------+
|  x|   y|                  z|
+---+----+-------------------+
|1.0|null|2012-01-01 00:00:00|
|NaN| foo|               null|
+---+----+-------------------+
Run Code Online (Sandbox Code Playgroud)
sdf.select([
    f(c) for c in sdf.columns for f in [isnan, isnull] 
    if (f, c) != (isnan, "z")  # isnan cannot be applied to timestamp 
]).show()
Run Code Online (Sandbox Code Playgroud)
+--------+-----------+--------+-----------+-----------+
|isnan(x)|(x IS NULL)|isnan(y)|(y IS NULL)|(z IS NULL)|
+--------+-----------+--------+-----------+-----------+
|   false|      false|   false|       true|      false|
|    true|      false|   false|      false|       true|
+--------+-----------+--------+-----------+-----------+
Run Code Online (Sandbox Code Playgroud)

在实践中,并行化的本地集合(包括Pandas对象)除了简单的测试和玩具示例之外具有可忽略的重要性,因此您可以始终手动转换数据(跳过可能的箭头优化):

import numpy as np

spark.createDataFrame([
   tuple(
        None if isinstance(x, (float, int)) and np.isnan(x) else x
        for x in record.tolist())
   for record in pdf.to_records(index=False)
], pdf.columns.tolist()).show()
Run Code Online (Sandbox Code Playgroud)
+----+----+-------------------+
|   x|   y|                  z|
+----+----+-------------------+
| 1.0|null|1325376000000000000|
|null| foo|               null|
+----+----+-------------------+
Run Code Online (Sandbox Code Playgroud)

如果缺少/非数字歧义不是问题,那么只需加载数据,并在Spark中替换.

from pyspark.sql.functions import col, when 

sdf.select([
    when(~isnan(c), col(c)).alias(c) if t in ("double", "float") else c 
    for c, t in sdf.dtypes
]).show()
Run Code Online (Sandbox Code Playgroud)
+----+----+-------------------+
|   x|   y|                  z|
+----+----+-------------------+
| 1.0|null|2012-01-01 00:00:00|
|null| foo|               null|
+----+----+-------------------+
Run Code Online (Sandbox Code Playgroud)


jus*_*ess 6

如果你想加载一个 Pandas df,你可以用 None 替换 NaN:

import pandas as pd
def load_csv(spark, path):
    """read csv to spark df"""
    pd_df = pd.read_csv(path)
    pd_df = pd_df.where((pd.notnull(pd_df)), None)
    df = spark.createDataFrame(pd_df)
    return df
Run Code Online (Sandbox Code Playgroud)