将 PySpark Dataframe 转换为 Pandas Dataframe 在时间戳列上失败

cry*_*yxn 10 python dataframe pandas apache-spark pyspark

我创建我的 pyspark 数据框:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, BinaryType, ArrayType, StringType, TimestampType
    input_schema = StructType([
            StructField("key", StringType()),
            StructField("headers", ArrayType(
                StructType([
                    StructField("key", StringType()),
                    StructField("value", StringType())
                ])
            )),
            StructField("timestamp", TimestampType())
        ])
    
        input_data = [
            ("key1", [{"key": "header1", "value": "value1"}], datetime(2023, 1, 1, 0, 0, 0)),
            ("key2", [{"key": "header2", "value": "value2"}], datetime(2023, 1, 1, 0, 0, 0)),
            ("key3", [{"key": "header3", "value": "value3"}], datetime(2023, 1, 1, 0, 0, 0))
        ]
    
        df = spark.createDataFrame(input_data, input_schema)
Run Code Online (Sandbox Code Playgroud)

我想使用 Pandas' assert_frame_equal(),所以我想将我的数据帧转换为 Pandas 数据帧。

df.toPandas()会扔TypeError: Casting to unit-less dtype 'datetime64' is not supported. Pass e.g. 'datetime64[ns]' instead.

如何成功转换“时间戳”列以免丢失日期时间值的详细信息?我需要他们留在2023-01-01 00:00:00而不是2023-01-01

cry*_*yxn 9

我找到了解决方案:

from pyspark.sql.functions import date_format

df = df.withColumn("timestamp", date_format("timestamp", "yyyy-MM-dd HH:mm:ss")).toPandas()
Run Code Online (Sandbox Code Playgroud)

现在我可以使用

assert_frame_equal(df, test_df)
Run Code Online (Sandbox Code Playgroud)

成功地。它并没有失去精度。


Dan*_*ini 5

我编写了一些通用逻辑以编程方式执行此操作,以防您不想对时间戳列的列标题进行硬编码:

from pyspark.sql.types import TimestampType
from pyspark.sql.functions import date_format

def convert_to_pandas(spark_df):
    """
    This function will safely convert a spark DataFrame to pandas.
    """
    # Iterate over columns and convert each timestamp column to a string
    timestamp_cols = []
    for column in spark_df.schema:
        if column.dataType == TimestampType():
            # Append column header to list
            timestamp_cols.append(column.name)
            # Set column to string using date_format function
            spark_df = spark_df.withColumn(
                column.name,
                date_format(column.name, "yyyy-MM-dd HH:mm:ss"))
    # Convert to a pandas DataFrame and reset timestamp columns
    pandas_df = spark_df.toPandas()
    for column_header in timestamp_cols:
        pandas_df[column_header] = pandas_df[
            column_header].astype("datetime64[ns]")

    return pandas_df
Run Code Online (Sandbox Code Playgroud)