dask 读取镶木地板并指定架构

Ray*_*ell 5 pandas apache-spark parquet dask pyarrow

是否有 dask 相当于 Spark 在读取镶木地板文件时指定模式的能力?可能使用传递给 pyarrow 的 kwargs 吗?

我的桶中有一堆镶木地板文件,但某些字段的名称略有不一致。我可以在阅读它们后创建一个自定义延迟函数来处理这些情况,但我希望在通过全局打开它们时可以指定模式。也许不是,因为我猜想通过 globing 打开然后会尝试将它们连接起来。由于字段名称不一致,目前此操作失败。

创建镶木地板文件:

import dask.dataframe as dd

df = dd.demo.make_timeseries(
    start="2000-01-01",
    end="2000-01-03",
    dtypes={"id": int, "z": int},
    freq="1h",
    partition_freq="24h",
)

df.to_parquet("df.parquet", engine="pyarrow", overwrite=True)
Run Code Online (Sandbox Code Playgroud)

通过 dask 读取它并在读取后指定模式:

df = dd.read_parquet("df.parquet", engine="pyarrow")
df["z"] = df["z"].astype("float")
df = df.rename(columns={"z": "a"})
Run Code Online (Sandbox Code Playgroud)

通过 Spark 读取它并指定模式:

from pyspark.sql import SparkSession
import pyspark.sql.types as T
spark = SparkSession.builder.appName('App').getOrCreate()

schema = T.StructType(
    [
        T.StructField("id", T.IntegerType()),
        T.StructField("a", T.FloatType()),
        T.StructField("timestamp", T.TimestampType()),
    ]
)

df = spark.read.format("parquet").schema(schema).load("df.parquet")
Run Code Online (Sandbox Code Playgroud)

Sul*_*yev 4

一些选项是:

  1. 加载后指定数据类型(需要一致的列名):
custom_dtypes = {"a": float, "id": int, "timestamp": pd.datetime}
df = dd.read_parquet("df.parquet", engine="pyarrow").astype(custom_dtypes)
Run Code Online (Sandbox Code Playgroud)

由于字段名称不一致,目前此操作失败。

  1. 如果文件之间的列名称不相同,您可能需要在加载delayed 之前使用自定义:
@delayed
def custom_load(path):
   df = pd.read_parquet(path)
   # some logic to ensure consistent columns
   # for example:
   if "z" in df.columns:
      df = df.rename(columns={"z": "a"}).astype(custom_dtypes)
   return df

dask_df = dd.from_delayed([custom_load(path) for path in glob.glob("some_path/*parquet")])
Run Code Online (Sandbox Code Playgroud)