cry*_*yxn 10 python dataframe pandas apache-spark pyspark
我创建我的 pyspark 数据框:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, BinaryType, ArrayType, StringType, TimestampType
input_schema = StructType([
StructField("key", StringType()),
StructField("headers", ArrayType(
StructType([
StructField("key", StringType()),
StructField("value", StringType())
])
)),
StructField("timestamp", TimestampType())
])
input_data = [
("key1", [{"key": "header1", "value": "value1"}], datetime(2023, 1, 1, 0, 0, 0)),
("key2", [{"key": "header2", "value": "value2"}], datetime(2023, 1, 1, 0, 0, 0)),
("key3", [{"key": "header3", "value": "value3"}], datetime(2023, 1, 1, 0, 0, 0))
]
df = spark.createDataFrame(input_data, input_schema)
Run Code Online (Sandbox Code Playgroud)
我想使用 Pandas' assert_frame_equal(),所以我想将我的数据帧转换为 Pandas 数据帧。
df.toPandas()会扔TypeError: Casting to unit-less dtype 'datetime64' is not supported. Pass e.g. 'datetime64[ns]' instead.
如何成功转换“时间戳”列以免丢失日期时间值的详细信息?我需要他们留在2023-01-01 00:00:00而不是2023-01-01。
我找到了解决方案:
from pyspark.sql.functions import date_format
df = df.withColumn("timestamp", date_format("timestamp", "yyyy-MM-dd HH:mm:ss")).toPandas()
Run Code Online (Sandbox Code Playgroud)
现在我可以使用
assert_frame_equal(df, test_df)
Run Code Online (Sandbox Code Playgroud)
成功地。它并没有失去精度。
我编写了一些通用逻辑以编程方式执行此操作,以防您不想对时间戳列的列标题进行硬编码:
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import date_format
def convert_to_pandas(spark_df):
"""
This function will safely convert a spark DataFrame to pandas.
"""
# Iterate over columns and convert each timestamp column to a string
timestamp_cols = []
for column in spark_df.schema:
if column.dataType == TimestampType():
# Append column header to list
timestamp_cols.append(column.name)
# Set column to string using date_format function
spark_df = spark_df.withColumn(
column.name,
date_format(column.name, "yyyy-MM-dd HH:mm:ss"))
# Convert to a pandas DataFrame and reset timestamp columns
pandas_df = spark_df.toPandas()
for column_header in timestamp_cols:
pandas_df[column_header] = pandas_df[
column_header].astype("datetime64[ns]")
return pandas_df
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5415 次 |
| 最近记录: |