将 dask 数据帧写入镶木地板:“TypeError”

pie*_*e_j 3 python parquet dask

我正在尝试使用 Dask 编写镶木地板文件。目标是使用它的repartition功能,但似乎我无法写出一个简单的镶木地板文件,而不进行这repartition一步......

这是我用来从 pyarrow 创建 parquet 文件的代码,通过 dask 读回它,然后再次写入。

import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import dask.dataframe as dd

file = 'example.parquet'
file_res = 'example_res.parquet'

# Generate a random df
df = pd.DataFrame(np.random.randint(100,size=(100000, 20)),columns=['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T'])

# Write 1st parquet file with pyarrow
table = pa.Table.from_pandas(df)
pq.write_table(table, file, version='1.0')

# Read it back with Dask, and write it again
dd_df = dd.read_parquet(file)
dd_df.to_parquet(file_res)
Run Code Online (Sandbox Code Playgroud)

最后一个写入步骤以 结束TypeError: expected list of bytes。完整日志如下:


  File "C:/Users/me/Documents/code/_draft/pyarrow_parquet_store.py", line 31, in <module>
    dd_df.to_parquet(file_res)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\dataframe\core.py", line 4075, in to_parquet
    return to_parquet(self, path, *args, **kwargs)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\dataframe\io\parquet\core.py", line 665, in to_parquet
    out = out.compute(**compute_kwargs)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\base.py", line 279, in compute
    (result,) = compute(self, traverse=False, **kwargs)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\base.py", line 567, in compute
    results = schedule(dsk, keys, **kwargs)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\threaded.py", line 84, in get
    **kwargs

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\local.py", line 486, in get_async
    raise_exception(exc, tb)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\local.py", line 316, in reraise
    raise exc

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\local.py", line 222, in execute_task
    result = _execute_task(task, data)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\utils.py", line 30, in apply
    return func(*args, **kwargs)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\dataframe\io\parquet\fastparquet.py", line 625, in write_partition
    fil, df, fmd.schema, compression=compression, fmd=fmd

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 631, in make_part_file
    rg = make_row_group(f, data, schema, compression=compression)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 619, in make_row_group
    compression=comp)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 513, in write_column
    data, selement)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 254, in encode_plain
    return pack_byte_array(list(out))

  File "fastparquet\speedups.pyx", line 112, in fastparquet.speedups.pack_byte_array

TypeError: expected list of bytes
Run Code Online (Sandbox Code Playgroud)

谢谢你的帮助。最好的。

mdu*_*ant 5

问题似乎是索引:它存储为纯元数据:RangeIndex(start=0, stop=100000, step=1),但 Dask 推断其具有“对象”(即字符串或更复杂的东西);因此尝试尝试将数字列表写成字符串。

虽然这是一个错误,但这里有一些解决方法:

  • 不写索引dd_df.to_parquet(file_res, write_index=False)
  • 对于像这样的单个分区,不带 dask 的 fastparquet API 工作得很好
  • 删除索引或设置新索引
  • 设置索引数据类型
  • 使用pyarrow,engine="pyarrow"