use*_*314 2 zip pandas dask dask-delayed
我正在尝试从一组压缩的 CSV 文件创建 dask 数据框。阅读问题,似乎 dask 需要使用 dask.distributedelasted()
import glob
import dask.dataframe as dd
import zipfile
import pandas as pd
from dask.delayed import delayed
#Create zip_dict with key-value pairs for .zip & .csv names
file_list = glob.glob('my_directory/zip_files/')
zip_dict = {}
for f in file_list:
key = f.split('/')[5][:-4]
zip_dict[key] = zipfile.ZipFile(f)
Run Code Online (Sandbox Code Playgroud)
zip_dict = {'log20160201': zipfile.ZipFile filename='/my_directory/zip_files/log20160201.zip' mode='r', 'log20160218': zipfile.ZipFile filename='/my_directory/zip_files/log20160218.zip' 的示例内容模式='r'}
# Create list of delayed pd.read_csv()
d_rows = []
for k, v in zip_dict.items():
row = delayed(pd.read_csv)(v.open(k+'.csv'),usecols=['time','cik'])
d_rows.append(row)
v.close()
Run Code Online (Sandbox Code Playgroud)
d_rows = [Delayed('read_csv-c05dc861-79c3-4e22-8da6-927f5b7da123'), Delayed('read_csv-4fe1c901-44b4-478b-9c11-4a80f7a639e2')] 的示例内容
big_df = dd.from_delayed(d_rows)
Run Code Online (Sandbox Code Playgroud)
返回的错误是: ValueError:无效的文件路径或缓冲区对象类型:类“list”
在这种情况下,我认为您实际上不需要字典zip_dict来用 Pandas 懒惰地读取这些压缩文件。基于这个非常相似的SO问题,使用Dask读取(.gz)压缩*.csv文件(也显示在此处),并且由于您有多个文件要加载,因此至少有两种可能的方法可供您使用
dask.delayed和pandas.read_csv
pandas.DataFrame,但您不会实际执行读入内存的操作,而是会延迟此操作,从而创建延迟对象的列表(至少有两种方法可以创建此列表,如下所示)
for,就像[delayed(pd.read_csv)(f) for f in file_list]
.csv.zip文件,那么这将创建一个包含 17 个延迟对象的列表map要使用and创建列表functools.partial,这会创建一个单元素列表,看起来像list(map(functools.partial(delayed(pd.read_csv), file_list)))
.csv.zip文件,那么这将创建一个包含 1 个延迟对象的列表dd.from_delayed这个延迟对象列表转换成pandas.DataFrame
dd.from_delayed(dfs)map()andfunctools.partial方法,您将使用dd.from_delayed(dfs).repartition(file_list)
dask.dataframe中的效果dask.dataframedask.dataframe,您需要使用.repartition()dask.dataframe.read_csv(file_list) 直接,它实际上使用pandas.read_csv,因此它接受来自的许多关键字参数pandas.read_csv在这两种方法中
dtypes列(按照建议)是 Dask 的最佳实践{"time": int, "cik": int},因为你只需要列time,cik并且你知道它们中的每一个都应该是int(整数)dtype.read_csv()关键字
usecols指定所需的列名称列表compression表示.zip正在读入文件下面是实现这些方法的代码,并根据需要提供简短的注释
from functools import partial
from itertools import repeat
from glob import glob
from collections import OrderedDict
import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask.delayed import delayed
Run Code Online (Sandbox Code Playgroud)
使用这个SO答案,生成多个.csv文件
def generate_multiple_csvs(data_dir, file_num=1):
col_names = list("ABCDEFG")+["time", "cik"]
df = pd.DataFrame(np.random.randint(10, size=(10,9)), columns=col_names)
filename = f"data_file_{file_num}.csv.zip"
filepath = data_dir + "/" + filename
df["filepath"] = filename
df.to_csv(filepath, index=False, compression="zip")
return df
# Specify path the directory where `.csv.zip` files should be created
data_dir = "data/processed"
# Specify number of files to create
num_files_wanted = 8
Run Code Online (Sandbox Code Playgroud)
用于itertools.repeat创建虚拟文件
_ = list(
map(
generate_multiple_csvs,
repeat(data_dir, num_files_wanted),
list(range(1, num_files_wanted+1)),
)
)
Run Code Online (Sandbox Code Playgroud)
用于functools.partial创建虚拟文件
_ = list(
map(
partial(generate_multiple_csvs, data_dir),
list(range(9, 9+num_files_wanted+1)),
)
)
Run Code Online (Sandbox Code Playgroud)
file_list = glob(data_dir + "/" + "*.zip")
Run Code Online (Sandbox Code Playgroud)
dtypes(推荐)my_dtypes = OrderedDict([("time",int), ("cik",int)])
Run Code Online (Sandbox Code Playgroud)
dask.delayed循环for# Lazily reading files into Pandas DataFrames by looping
dfs = [
delayed(pd.read_csv)(f, compression='zip', usecols=['time','cik'])
for f in file_list
]
# Combine into a single Dask DataFrame
ddf_from_delayed_loop = dd.from_delayed(dfs, meta=my_dtypes)
print(type(ddf_from_delayed_loop))
print(ddf_from_delayed_loop)
Run Code Online (Sandbox Code Playgroud)
输出
<class 'dask.dataframe.core.DataFrame'>
Dask DataFrame Structure:
time cik
npartitions=17
int64 int64
... ...
... ... ...
... ...
... ...
Dask Name: from-delayed, 34 tasks
Run Code Online (Sandbox Code Playgroud)
dask.delayedwithmap# Lazily reading files into Pandas DataFrames with Python's built-in map()
dfs = list(
map(
partial(
delayed(pd.read_csv),
compression="zip",
usecols=['time', 'cik'],
),
file_list,
)
)
# Combine into a single Dask DataFrame and repartition
ddf_from_delayed_map = dd.from_delayed(dfs, meta=my_dtypes).repartition(
npartitions=len(file_list)
)
print(type(ddf_from_delayed_map))
print(ddf_from_delayed_map)
Run Code Online (Sandbox Code Playgroud)
输出
<class 'dask.dataframe.core.DataFrame'>
Dask DataFrame Structure:
time cik
npartitions=17
int64 int64
... ...
... ... ...
... ...
... ...
Dask Name: from-delayed, 34 tasks
Run Code Online (Sandbox Code Playgroud)
dask.dataframe# Lazily reading files into single Dask DataFrame
ddf_direct = dd.read_csv(
data_dir+"/*.csv.zip",
compression='zip',
dtype=my_dtypes,
blocksize=None,
usecols=['time','cik'],
)
print(type(ddf_direct))
print(ddf_direct)
Run Code Online (Sandbox Code Playgroud)
输出
<class 'dask.dataframe.core.DataFrame'>
Dask DataFrame Structure:
time cik
npartitions=17
int64 int64
... ...
... ... ...
... ...
... ...
Dask Name: read-csv, 17 tasks
Run Code Online (Sandbox Code Playgroud)
dask.delayedfor以减少大量调用的开销(有关批处理实现,dask.delayed请参阅此 SO 问题)。| 归档时间: |
|
| 查看次数: |
3141 次 |
| 最近记录: |