标签: dask

通过读取Python的dask模块中的pickle文件来创建dask数据帧

当我试图通过读取pickle文件创建一个dask数据帧时,我得到一个错误

import dask.dataframe as dd
ds_df = dd.read_pickle("D:\test.pickle")

AttributeError: 'module' object has no attribute 'read_pickle'

but  it works fine with read_csv
Run Code Online (Sandbox Code Playgroud)

在熊猫中它像往常一样成功.

所以请纠正我,如果我在那里做错了什么或在dask我们无法通过阅读pickle文件来创建数据帧.

python dask

3
推荐指数
1
解决办法
1712
查看次数

如何从URL列表中创建Dask DataFrame?

我有一个URL列表,我很乐意立即将它们读到dask数据框,但看起来read_csv不能使用星号http.有没有办法实现这一目标?

这是一个例子:

link = 'http://web.mta.info/developers/'

data = [     'data/nyct/turnstile/turnstile_170128.txt',
                        'data/nyct/turnstile/turnstile_170121.txt',
                        'data/nyct/turnstile/turnstile_170114.txt',
                        'data/nyct/turnstile/turnstile_170107.txt' 
        ]
Run Code Online (Sandbox Code Playgroud)

而我想要的是

df = dd.read_csv('XXXX*X')

python pandas dask

3
推荐指数
1
解决办法
1211
查看次数

如何使用Dask.array有效地将大型numpy数组发送到集群

我的本地计算机上有一个很大的NumPy数组,我想与集群上的Dask.array并行化

import numpy as np
x = np.random.random((1000, 1000, 1000))
Run Code Online (Sandbox Code Playgroud)

但是,当我使用dask.array时,我发现调度程序开始占用大量RAM。为什么是这样?这些数据不应该交给工人吗?

import dask.array as da
x = da.from_array(x, chunks=(100, 100, 100))

from dask.distributed import Client
client = Client(...)
x = x.persist()
Run Code Online (Sandbox Code Playgroud)

numpy dask

3
推荐指数
1
解决办法
1136
查看次数

与HDF5相比,为什么从CSV导入时pandas和dask的性能更好?

我正在使用当前使用大(> 5GB).csv文件操作的系统.为了提高性能,我正在测试(A)从磁盘创建数据帧的不同方法(pandas VS dask)以及(B)将结果存储到磁盘的不同方法(.csv VS hdf5文件).

为了衡量绩效,我做了以下几点:

def dask_read_from_hdf():
    results_dd_hdf = dd.read_hdf('store.h5', key='period1', columns = ['Security'])
    analyzed_stocks_dd_hdf =  results_dd_hdf.Security.unique()
    hdf.close()

def pandas_read_from_hdf():
    results_pd_hdf = pd.read_hdf('store.h5', key='period1', columns = ['Security'])
    analyzed_stocks_pd_hdf =  results_pd_hdf.Security.unique()
    hdf.close()

def dask_read_from_csv():
    results_dd_csv = dd.read_csv(results_path, sep = ",", usecols = [0], header = 1, names = ["Security"])
    analyzed_stocks_dd_csv =  results_dd_csv.Security.unique()

def pandas_read_from_csv():
    results_pd_csv = pd.read_csv(results_path, sep = ",", usecols = [0], header = 1, names = ["Security"])
    analyzed_stocks_pd_csv =  results_pd_csv.Security.unique()

print "dask hdf performance"
%timeit …
Run Code Online (Sandbox Code Playgroud)

python hdf5 dask

3
推荐指数
1
解决办法
1380
查看次数

dask.multiprocessing或pandas + multiprocessing.pool:有什么区别?

我正在开发一种用于财务目的的模型。我将整个S&P500组件放在一个文件夹中,其中存储了许多.hdf文件。每个.hdf文件都有其自己的多索引(年-周-分钟)。

顺序代码示例(非并行化):

import os
from classAsset import Asset


def model(current_period, previous_perdiod):
    # do stuff on the current period, based on stats derived from previous_period
    return results

if __name__ == '__main__':
    for hdf_file in os.listdir('data_path'):
        asset = Asset(hdf_file)
        for year in asset.data.index.get_level_values(0).unique().values:
            for week in asset.data.loc[year].index.get_level_values(0).unique().values:

                previous_period = asset.data.loc[(start):(end)].Open.values  # start and end are defined in another function
                current_period = asset.data.loc[year, week].Open.values

                model(current_period, previous_period)
Run Code Online (Sandbox Code Playgroud)

为了加快处理过程,我使用multiprocessing.pool在多个.hdf文件上同时运行相同的算法,因此我对处理速度非常满意(我有一个4c / 8t CPU)。但是现在我发现了Dask。

Dask文档的“ DataFrame概述”中,它们指示:

几乎可并行化的操作(快速)

  • 元素运算:df.x + df.y,df * df
  • 按行选择:df [df.x> …

python multithreading multiprocessing pandas dask

3
推荐指数
1
解决办法
3935
查看次数

将Dask DataFrame存储为pickle

我有一个Dask DataFrame构造如下:

import dask.dataframe as dd

df = dd.read_csv('matrix.txt', header=None)
type(df) //dask.dataframe.core.DataFrame
Run Code Online (Sandbox Code Playgroud)

有没有办法将这个DataFrame保存为pickle?

例如,

df.to_pickle('matrix.pkl')
Run Code Online (Sandbox Code Playgroud)

python dataframe pandas dask

3
推荐指数
2
解决办法
1247
查看次数

使用dask延迟创建字典值

我正在努力弄清楚如何让dask延迟工作在涉及创建字典的特定工作流程上.

这里的想法是func1,func2,func3可以同时独立运行,我希望这些函数的结果是新字典中的值z.

from dask.delayed import delayed

x1 = {'a': 1, 'b': 2, 'c': 3}
x2 = {'a': 4, 'b': 5, 'c': 6}

@delayed
def func1(d1, d2):
    return d1['a'] + d2['a']

@delayed
def func2(d1, d2):
    return d1['b'] - d2['b']

@delayed
def func3(d1, d2):
    return d1['c'] * d2['c']

z = {}
z['val1'] = func1(x1, x2)
z['val2'] = func2(x1, x2)
z['val3'] = func3(x1, x2)
Run Code Online (Sandbox Code Playgroud)

当我运行以下操作时,出现错误:

>>> result_dict = z.compute()

AttributeError: 'dict' object has no attribute 'compute'
Run Code Online (Sandbox Code Playgroud)

当我运行以下命令时,它会成功,但结果是元组而不是字典.

>>> result_dict = dask.compute(z) …
Run Code Online (Sandbox Code Playgroud)

python dictionary dask dask-delayed

3
推荐指数
1
解决办法
926
查看次数

将dask数据帧保存到csv并找出其长度而不计算两次

说,我有一些dask数据帧.我想用它做一些操作,而不是保存到csv并打印它的len.

据我所知,以下代码将使dask计算df两次,我是对的吗?

df = dd.read_csv('path/to/file', dtype=some_dtypes)
#some operations...
df.to_csv("path/to/out/*")
print(len(df))
Run Code Online (Sandbox Code Playgroud)

有可能避免计算两次?

UPD.当我使用@mdurant的解决方案时会发生这种情况 在此输入图像描述

但实际上减少了近6倍的行数

在此输入图像描述

python dataframe dask

3
推荐指数
1
解决办法
2889
查看次数

使用Dask数据框删除列

这应该工作:

raw_data.drop('some_great_column', axis=1).compute(

但是该列未删除。在大熊猫中,我使用:

raw_data.drop(['some_great_column'], axis=1, inplace=True)
Run Code Online (Sandbox Code Playgroud)

但是就地在Dask中不存在。有任何想法吗?

python python-3.x pandas dask

3
推荐指数
1
解决办法
3353
查看次数

在Dask中预分散数据对象是否有优势?

如果我将数据对象预分散到多个工作节点上,是否会将它完整地复制到每个工作节点上?如果该数据对象很大,这样做有好处吗?

以该futures接口为例:

client.scatter(data, broadcast=True)
results = dict()
for i in tqdm_notebook(range(replicates)):
    results[i] = client.submit(nn_train_func, data, **params)
Run Code Online (Sandbox Code Playgroud)

以该delayed接口为例:

client.scatter(data, broadcast=True)
results = dict()
for i in tqdm_notebook(range(replicates)):
    results[i] = delayed(nn_train_func, data, **params)
Run Code Online (Sandbox Code Playgroud)

我问的原因是因为我注意到以下现象:

  1. 如果我预分散数据,则delayed似乎将数据重新发送到工作程序节点,从而使内存使用量大约增加了一倍。似乎预分散并没有按照我的预期做,这允许工作节点引用预分散的数据。
  2. futures接口需要很长的时间来迭代通过循环(显著更长)。我目前不确定如何确定这里的瓶颈。
  3. 使用该delayed接口,从compute()调用函数的时间到活动在仪表板上反映的时间,存在很大的延迟,我怀疑这是由于数据复制造成的。

python parallel-processing dask

3
推荐指数
1
解决办法
264
查看次数