当我试图通过读取pickle文件创建一个dask数据帧时,我得到一个错误
import dask.dataframe as dd
ds_df = dd.read_pickle("D:\test.pickle")
AttributeError: 'module' object has no attribute 'read_pickle'
but it works fine with read_csv
Run Code Online (Sandbox Code Playgroud)
在熊猫中它像往常一样成功.
所以请纠正我,如果我在那里做错了什么或在dask我们无法通过阅读pickle文件来创建数据帧.
我有一个URL列表,我很乐意立即将它们读到dask数据框,但看起来read_csv不能使用星号http.有没有办法实现这一目标?
这是一个例子:
link = 'http://web.mta.info/developers/'
data = [ 'data/nyct/turnstile/turnstile_170128.txt',
'data/nyct/turnstile/turnstile_170121.txt',
'data/nyct/turnstile/turnstile_170114.txt',
'data/nyct/turnstile/turnstile_170107.txt'
]
Run Code Online (Sandbox Code Playgroud)
而我想要的是
df = dd.read_csv('XXXX*X')
我的本地计算机上有一个很大的NumPy数组,我想与集群上的Dask.array并行化
import numpy as np
x = np.random.random((1000, 1000, 1000))
Run Code Online (Sandbox Code Playgroud)
但是,当我使用dask.array时,我发现调度程序开始占用大量RAM。为什么是这样?这些数据不应该交给工人吗?
import dask.array as da
x = da.from_array(x, chunks=(100, 100, 100))
from dask.distributed import Client
client = Client(...)
x = x.persist()
Run Code Online (Sandbox Code Playgroud) 我正在使用当前使用大(> 5GB).csv文件操作的系统.为了提高性能,我正在测试(A)从磁盘创建数据帧的不同方法(pandas VS dask)以及(B)将结果存储到磁盘的不同方法(.csv VS hdf5文件).
为了衡量绩效,我做了以下几点:
def dask_read_from_hdf():
results_dd_hdf = dd.read_hdf('store.h5', key='period1', columns = ['Security'])
analyzed_stocks_dd_hdf = results_dd_hdf.Security.unique()
hdf.close()
def pandas_read_from_hdf():
results_pd_hdf = pd.read_hdf('store.h5', key='period1', columns = ['Security'])
analyzed_stocks_pd_hdf = results_pd_hdf.Security.unique()
hdf.close()
def dask_read_from_csv():
results_dd_csv = dd.read_csv(results_path, sep = ",", usecols = [0], header = 1, names = ["Security"])
analyzed_stocks_dd_csv = results_dd_csv.Security.unique()
def pandas_read_from_csv():
results_pd_csv = pd.read_csv(results_path, sep = ",", usecols = [0], header = 1, names = ["Security"])
analyzed_stocks_pd_csv = results_pd_csv.Security.unique()
print "dask hdf performance"
%timeit …Run Code Online (Sandbox Code Playgroud) 我正在开发一种用于财务目的的模型。我将整个S&P500组件放在一个文件夹中,其中存储了许多.hdf文件。每个.hdf文件都有其自己的多索引(年-周-分钟)。
顺序代码示例(非并行化):
import os
from classAsset import Asset
def model(current_period, previous_perdiod):
# do stuff on the current period, based on stats derived from previous_period
return results
if __name__ == '__main__':
for hdf_file in os.listdir('data_path'):
asset = Asset(hdf_file)
for year in asset.data.index.get_level_values(0).unique().values:
for week in asset.data.loc[year].index.get_level_values(0).unique().values:
previous_period = asset.data.loc[(start):(end)].Open.values # start and end are defined in another function
current_period = asset.data.loc[year, week].Open.values
model(current_period, previous_period)
Run Code Online (Sandbox Code Playgroud)
为了加快处理过程,我使用multiprocessing.pool在多个.hdf文件上同时运行相同的算法,因此我对处理速度非常满意(我有一个4c / 8t CPU)。但是现在我发现了Dask。
在Dask文档的“ DataFrame概述”中,它们指示:
几乎可并行化的操作(快速):
我有一个Dask DataFrame构造如下:
import dask.dataframe as dd
df = dd.read_csv('matrix.txt', header=None)
type(df) //dask.dataframe.core.DataFrame
Run Code Online (Sandbox Code Playgroud)
有没有办法将这个DataFrame保存为pickle?
例如,
df.to_pickle('matrix.pkl')
Run Code Online (Sandbox Code Playgroud) 我正在努力弄清楚如何让dask延迟工作在涉及创建字典的特定工作流程上.
这里的想法是func1,func2,func3可以同时独立运行,我希望这些函数的结果是新字典中的值z.
from dask.delayed import delayed
x1 = {'a': 1, 'b': 2, 'c': 3}
x2 = {'a': 4, 'b': 5, 'c': 6}
@delayed
def func1(d1, d2):
return d1['a'] + d2['a']
@delayed
def func2(d1, d2):
return d1['b'] - d2['b']
@delayed
def func3(d1, d2):
return d1['c'] * d2['c']
z = {}
z['val1'] = func1(x1, x2)
z['val2'] = func2(x1, x2)
z['val3'] = func3(x1, x2)
Run Code Online (Sandbox Code Playgroud)
当我运行以下操作时,出现错误:
>>> result_dict = z.compute()
AttributeError: 'dict' object has no attribute 'compute'
Run Code Online (Sandbox Code Playgroud)
当我运行以下命令时,它会成功,但结果是元组而不是字典.
>>> result_dict = dask.compute(z) …Run Code Online (Sandbox Code Playgroud) 说,我有一些dask数据帧.我想用它做一些操作,而不是保存到csv并打印它的len.
据我所知,以下代码将使dask计算df两次,我是对的吗?
df = dd.read_csv('path/to/file', dtype=some_dtypes)
#some operations...
df.to_csv("path/to/out/*")
print(len(df))
Run Code Online (Sandbox Code Playgroud)
有可能避免计算两次?
但实际上减少了近6倍的行数
这应该工作:
raw_data.drop('some_great_column', axis=1).compute()
但是该列未删除。在大熊猫中,我使用:
raw_data.drop(['some_great_column'], axis=1, inplace=True)
Run Code Online (Sandbox Code Playgroud)
但是就地在Dask中不存在。有任何想法吗?
如果我将数据对象预分散到多个工作节点上,是否会将它完整地复制到每个工作节点上?如果该数据对象很大,这样做有好处吗?
以该futures接口为例:
client.scatter(data, broadcast=True)
results = dict()
for i in tqdm_notebook(range(replicates)):
results[i] = client.submit(nn_train_func, data, **params)
Run Code Online (Sandbox Code Playgroud)
以该delayed接口为例:
client.scatter(data, broadcast=True)
results = dict()
for i in tqdm_notebook(range(replicates)):
results[i] = delayed(nn_train_func, data, **params)
Run Code Online (Sandbox Code Playgroud)
我问的原因是因为我注意到以下现象:
delayed似乎将数据重新发送到工作程序节点,从而使内存使用量大约增加了一倍。似乎预分散并没有按照我的预期做,这允许工作节点引用预分散的数据。futures接口需要很长的时间来迭代通过循环(显著更长)。我目前不确定如何确定这里的瓶颈。delayed接口,从compute()调用函数的时间到活动在仪表板上反映的时间,存在很大的延迟,我怀疑这是由于数据复制造成的。dask ×10
python ×9
pandas ×4
dataframe ×2
dask-delayed ×1
dictionary ×1
hdf5 ×1
numpy ×1
python-3.x ×1