Ray*_*ell 5 pandas apache-spark parquet dask pyarrow
是否有 dask 相当于 Spark 在读取镶木地板文件时指定模式的能力?可能使用传递给 pyarrow 的 kwargs 吗?
我的桶中有一堆镶木地板文件,但某些字段的名称略有不一致。我可以在阅读它们后创建一个自定义延迟函数来处理这些情况,但我希望在通过全局打开它们时可以指定模式。也许不是,因为我猜想通过 globing 打开然后会尝试将它们连接起来。由于字段名称不一致,目前此操作失败。
创建镶木地板文件:
import dask.dataframe as dd
df = dd.demo.make_timeseries(
start="2000-01-01",
end="2000-01-03",
dtypes={"id": int, "z": int},
freq="1h",
partition_freq="24h",
)
df.to_parquet("df.parquet", engine="pyarrow", overwrite=True)
Run Code Online (Sandbox Code Playgroud)
通过 dask 读取它并在读取后指定模式:
df = dd.read_parquet("df.parquet", engine="pyarrow")
df["z"] = df["z"].astype("float")
df = df.rename(columns={"z": "a"})
Run Code Online (Sandbox Code Playgroud)
通过 Spark 读取它并指定模式:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
spark = SparkSession.builder.appName('App').getOrCreate()
schema = T.StructType(
[
T.StructField("id", T.IntegerType()),
T.StructField("a", T.FloatType()),
T.StructField("timestamp", T.TimestampType()),
]
)
df = spark.read.format("parquet").schema(schema).load("df.parquet")
Run Code Online (Sandbox Code Playgroud)
一些选项是:
custom_dtypes = {"a": float, "id": int, "timestamp": pd.datetime}
df = dd.read_parquet("df.parquet", engine="pyarrow").astype(custom_dtypes)
Run Code Online (Sandbox Code Playgroud)
由于字段名称不一致,目前此操作失败。
delayed 之前使用自定义:@delayed
def custom_load(path):
df = pd.read_parquet(path)
# some logic to ensure consistent columns
# for example:
if "z" in df.columns:
df = df.rename(columns={"z": "a"}).astype(custom_dtypes)
return df
dask_df = dd.from_delayed([custom_load(path) for path in glob.glob("some_path/*parquet")])
Run Code Online (Sandbox Code Playgroud)