标签: dask

Dask ProgressBar 不适用于分布式后端

进度条与multiprocessing后端一起使用时效果很好,但在使用distributed调度程序作为后端时似乎根本不起作用。

有没有解决的办法?还是另一种解决方案?该distributed软件包本身有一些进度条,但它们都需要一个期货列表才能工作。

python distributed progress-bar dask

2
推荐指数
1
解决办法
1492
查看次数

将 HDF 文件加载到 Python Dask DataFrames 列表中

我有一个 HDF5 文件,我想将它加载到 Dask DataFrames 列表中。我已经按照 Dask管道方法的缩写版本使用循环进行了设置。这是代码:

import pandas as pd
from dask import compute, delayed
import dask.dataframe as dd
import os, h5py

@delayed
def load(d,k):
    ddf = dd.read_hdf(os.path.join(d,'Cleaned.h5'), key=k)
    return ddf

if __name__ == '__main__':      
    d = 'C:\Users\User\FileD'
    loaded = [load(d,'/DF'+str(i)) for i in range(1,10)]

    ddf_list = compute(*loaded)
    print(ddf_list[0].head(),ddf_list[0].compute().shape)
Run Code Online (Sandbox Code Playgroud)

我收到此错误消息:

C:\Python27\lib\site-packages\tables\group.py:1187: UserWarning: problems loading leaf ``/DF1/table``::

  HDF5 error back trace

  File "..\..\hdf5-1.8.18\src\H5Dio.c", line 173, in H5Dread
    can't read data
  File "..\..\hdf5-1.8.18\src\H5Dio.c", line 543, in H5D__read
    can't initialize …
Run Code Online (Sandbox Code Playgroud)

hdf5 dataframe python-2.7 h5py dask

2
推荐指数
1
解决办法
2070
查看次数

Dask Dataframe将列表列拆分为多个列

Pandas中的相同任务可以轻松完成

import pandas as pd
df = pd.DataFrame({"lists":[[i, i+1] for i in range(10)]})
df[['left','right']] = pd.DataFrame([x for x in df.lists])
Run Code Online (Sandbox Code Playgroud)

但我无法弄清楚如何做一些类似的东西 dask.dataframe

更新

到目前为止,我发现了这个解决方法

ddf = dd.from_pandas(df, npartitions=2)
ddf["left"] = ddf.apply(lambda x: x["lists"][0], axis=1, meta=pd.Series())
ddf["right"] = ddf.apply(lambda x: x["lists"][1], axis=1, meta=pd.Series())
Run Code Online (Sandbox Code Playgroud)

我想知道是否还有另一种方法可以进行.

python dataframe pandas dask

2
推荐指数
1
解决办法
1504
查看次数

如何将 pandas str.split 调用转换为 dask

我有一个 dask 数据框,其中索引是一个如下所示的字符串:

12/09/2016 00:00;32.0046;-106.259
12/09/2016 00:00;32.0201;-108.838
12/09/2016 00:00;32.0224;-106.004
Run Code Online (Sandbox Code Playgroud)

(它基本上是一个编码日期时间的字符串;纬度;行的经度)

我想在 dask 上下文中将其拆分为代表每个字段的各个列。

我可以用熊猫数据框来做到这一点:

df['date'], df['Lat'], df['Lon'] = df.index.str.split(';', 2).str
Run Code Online (Sandbox Code Playgroud)

但是对于我尝试过的几次尝试,这在 dask 中不起作用。如果我直接将 df 替换为 dask df,则会出现错误:

'Index' object has no attribute 'str'
Run Code Online (Sandbox Code Playgroud)

如果我使用列名而不是索引作为:

forecastDf['date'], forecastDf['Lat'], forecastDf['Lon'] = forecastDf['dateLocation'].str.split(';', 2).str
Run Code Online (Sandbox Code Playgroud)

我收到错误:

TypeError: 'StringAccessor' object is not iterable
Run Code Online (Sandbox Code Playgroud)

这是一个在 Pandas 中工作的可运行示例

import pandas as pd
df = pd.DataFrame()
df['dateLocation'] = ['12/09/2016 00:00;32.0046;-106.259','12/09/2016   00:00;32.0201;-108.838','12/09/2016 00:00;32.0224;-106.004']
df = df.set_index('dateLocation')
df['date'], df['Lat'], df['Lon'] = df.index.str.split(';', 2).str
df.head()
Run Code Online (Sandbox Code Playgroud)

这是我直接将其转换为 dask 时得到的错误

import dask.dataframe as dd …
Run Code Online (Sandbox Code Playgroud)

dask

2
推荐指数
1
解决办法
2048
查看次数

Dask在Groupby上复制Pandas的价值计数

我正在尝试做的是快速复制熊猫的值计数+ idxmax函数,因为我有很多数据。这是一个示例数据框:

partner_num cust_id item_id revw_ratg_num   revw_dt item_qty
0   100 01  5   05/30/2000  0
0   100 03  5   05/30/2000  0
0   100 02  5   05/30/2000  0
1   200 13  4   04/01/2000  0
1   200 14  5   04/01/2000  1
2   200 22  2   04/01/2000  1
3   200 37  3   04/01/2000  1
9   300 92  1   03/24/2000  1
9   300 93  1   03/24/2000  1
9   300 94  1   03/24/2000  0
9   300 99  1   03/24/2000  0
6   300 91  2   03/24/2000  0 …
Run Code Online (Sandbox Code Playgroud)

python bigdata dataframe pandas dask

2
推荐指数
1
解决办法
1054
查看次数

使用 dask 或 joblib multiprocessing 编译可执行文件会导致错误

我正在使用 dask 或 joblib 将一些串行处理的 python 作业转换为多处理。可悲的是,我需要在 Windows 上工作。
当从 IPython 内运行或从命令行使用 python 调用 py 文件时,一切运行正常。
使用 cython 编译可执行文件时,它不再正常运行:越来越多的进程(无限制且大于请求的进程数)开始启动并阻塞我的系统。
它以某种方式感觉像多处理炸弹- 但当然我曾经if __name__=="__main__:"拥有控制块 - 通过在命令行中从 python 调用中正常运行获得批准。
我的 cython 调用是cython --embed --verbose --annotate THECODE.PY,我正在编译gcc -time -municode -DMS_WIN64 -mthreads -Wall -O -I"PATH_TO_\include" -L"PATH_TO_\libs" THECODE.c -lpython36 -o THECODE生成一个 windows 可执行文件THECODE.exe
其他(单处理)代码运行良好。
dask 和 joblib 的问题似乎相同(这可能意味着 dask 的工作方式类似于或基于 joblib)。
有什么建议?

对于那些对mcve感兴趣的:只需从Multiprocessing Bomb 中获取第一个代码并使用上面的 cython 命令编译它就会导致一个可执行文件炸毁你的系统。(我刚试过:-))

通过在代码示例中添加一行以显示以下内容,我发现了一些有趣的东西__name__: …

python windows cython joblib dask

2
推荐指数
1
解决办法
1822
查看次数

从`dask.DataFrame`中切出几行

通常,当使用large时dask.DataFrame,仅获取几行以测试所有后续操作将很有用。

当前,根据“ 切片Dask数据帧”,此功能不受支持。

  • 我希望然后使用它head来实现相同的功能(因为支持该命令),但是它返回一个常规的熊猫DataFrame。
  • 我也尝试过df[:1000]执行,但是生成的输出与您从Pandas期望的输出不同。

有没有办法从a抓取前1000行dask.DataFrame

dask

2
推荐指数
1
解决办法
1439
查看次数

如何在Dask中进行行处理和项目分配

类似的未回答问题:逐行处理 Dask 数据帧

我正在处理数百万行长的数据帧,所以现在我试图并行执行所有数据帧操作。我需要转换为 Dask 的一个这样的操作是:

 for row in df.itertuples():                                                                                                                                                                                                         
     ratio = row.ratio                                                                                                                                                                                                                     
     tmpratio = row.tmpratio                                                                                                                                                                                                                                                                                                                                                                                                 
     tmplabel = row.tmplabel                                                                                                                                                                                                               
     if tmpratio > ratio:                                                                                                                                                                                                                  
         df.loc[row.Index,'ratio'] = tmpratio                                                                                                                                                                                        
         df.loc[row.Index,'label'] = tmplabel
Run Code Online (Sandbox Code Playgroud)

在 Dask 中按索引设置值或有条件地在行中设置值的合适方法是什么?由于.loc不支持DASK项任务,似乎没有成为set_valueat[]或者iat[]在DASK无论是。

我尝试将map_partitionsassign一起使用,但我没有看到在行级别执行条件分配的任何能力。

python dataframe pandas dask

2
推荐指数
1
解决办法
4189
查看次数

Pandas Apply - 返回多行

我有两个数据框,我需要比较行的完整组合并返回符合条件的组合.事实证明,对于我们的小型集群来说Spark(使用交叉连接)过于密集,所以我正在尝试这种方法,最终会看到是否Dask可以改进它.

如果表A和B是

a=pd.DataFrame(np.array([[1,2,3],[4,5,6]]), columns=['a','b','c'])

b=pd.DataFrame(np.array([[4,7,4],[6,5,1],[8,6,0]]), columns=['d','e','f'])
Run Code Online (Sandbox Code Playgroud)

然后所有组合看起来像这样,其中计算AD.假设我只想保留AD> = - 3的行

A   B   C   D   E   F   A-D
1   2   3   4   7   4   -3
1   2   3   6   5   1   -5
1   2   3   8   6   0   -7
4   5   6   4   7   4   0
4   5   6   6   5   1   -2
4   5   6   8   6   0   -4
Run Code Online (Sandbox Code Playgroud)

我尝试使用apply执行此操作,但似乎我无法返回dataframe函数的多行(该函数创建'A'的单行和'B'的整个表的所有组合并返回行符合标准.

这是我测试的功能:

def return_prox_branches(a, B, cutthresh):

    aa=a['a']-B['d']

    keep_B = B.copy().loc[(aa.values >= cutthresh),:]

    keep_B['A']=a['a']

    keep_B['B']=a['b'] …
Run Code Online (Sandbox Code Playgroud)

python pandas dask

2
推荐指数
1
解决办法
870
查看次数

Spark vs Dask中的容错

我在已知限制部分的Dask文档中阅读了以下内容:

  • 它[Dask]不具有容错能力.任何工人的失败都可能导致系统崩溃.

  • 如果出现错误,它不会优雅地失败

但是在与Spark的比较中我没有看到任何容错的 提及.这些是目前"你可能选择Spark的原因":

  • 您更喜欢Scala或SQL语言
  • 您主要拥有JVM基础架构和遗留系统
  • 您需要一个既定且值得信赖的业务解决方案
  • 您主要使用轻量级机器学习进行业务分析
  • 您想要一个多功能的解决方案

我的问题:

  • Spark实际上是以Dask目前不具备的容错方式设计的吗?
  • Spark 提供了什么类型的容错(理论上/实践中)Dask没有,如果有的话,反之亦然?

apache-spark pyspark dask

2
推荐指数
1
解决办法
340
查看次数