pie*_*e_j 3 python parquet dask
我正在尝试使用 Dask 编写镶木地板文件。目标是使用它的repartition功能,但似乎我无法写出一个简单的镶木地板文件,而不进行这repartition一步......
这是我用来从 pyarrow 创建 parquet 文件的代码,通过 dask 读回它,然后再次写入。
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import dask.dataframe as dd
file = 'example.parquet'
file_res = 'example_res.parquet'
# Generate a random df
df = pd.DataFrame(np.random.randint(100,size=(100000, 20)),columns=['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T'])
# Write 1st parquet file with pyarrow
table = pa.Table.from_pandas(df)
pq.write_table(table, file, version='1.0')
# Read it back with Dask, and write it again
dd_df = dd.read_parquet(file)
dd_df.to_parquet(file_res)
Run Code Online (Sandbox Code Playgroud)
最后一个写入步骤以 结束TypeError: expected list of bytes。完整日志如下:
File "C:/Users/me/Documents/code/_draft/pyarrow_parquet_store.py", line 31, in <module>
dd_df.to_parquet(file_res)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\dataframe\core.py", line 4075, in to_parquet
return to_parquet(self, path, *args, **kwargs)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\dataframe\io\parquet\core.py", line 665, in to_parquet
out = out.compute(**compute_kwargs)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\base.py", line 279, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\base.py", line 567, in compute
results = schedule(dsk, keys, **kwargs)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\threaded.py", line 84, in get
**kwargs
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\local.py", line 486, in get_async
raise_exception(exc, tb)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\local.py", line 316, in reraise
raise exc
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\local.py", line 222, in execute_task
result = _execute_task(task, data)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\utils.py", line 30, in apply
return func(*args, **kwargs)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\dataframe\io\parquet\fastparquet.py", line 625, in write_partition
fil, df, fmd.schema, compression=compression, fmd=fmd
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 631, in make_part_file
rg = make_row_group(f, data, schema, compression=compression)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 619, in make_row_group
compression=comp)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 513, in write_column
data, selement)
File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 254, in encode_plain
return pack_byte_array(list(out))
File "fastparquet\speedups.pyx", line 112, in fastparquet.speedups.pack_byte_array
TypeError: expected list of bytes
Run Code Online (Sandbox Code Playgroud)
谢谢你的帮助。最好的。
问题似乎是索引:它存储为纯元数据:RangeIndex(start=0, stop=100000, step=1),但 Dask 推断其具有“对象”(即字符串或更复杂的东西);因此尝试尝试将数字列表写成字符串。
虽然这是一个错误,但这里有一些解决方法:
dd_df.to_parquet(file_res, write_index=False)engine="pyarrow"