进度条与multiprocessing后端一起使用时效果很好,但在使用distributed调度程序作为后端时似乎根本不起作用。
有没有解决的办法?还是另一种解决方案?该distributed软件包本身有一些进度条,但它们都需要一个期货列表才能工作。
我有一个 HDF5 文件,我想将它加载到 Dask DataFrames 列表中。我已经按照 Dask管道方法的缩写版本使用循环进行了设置。这是代码:
import pandas as pd
from dask import compute, delayed
import dask.dataframe as dd
import os, h5py
@delayed
def load(d,k):
ddf = dd.read_hdf(os.path.join(d,'Cleaned.h5'), key=k)
return ddf
if __name__ == '__main__':
d = 'C:\Users\User\FileD'
loaded = [load(d,'/DF'+str(i)) for i in range(1,10)]
ddf_list = compute(*loaded)
print(ddf_list[0].head(),ddf_list[0].compute().shape)
Run Code Online (Sandbox Code Playgroud)
我收到此错误消息:
C:\Python27\lib\site-packages\tables\group.py:1187: UserWarning: problems loading leaf ``/DF1/table``::
HDF5 error back trace
File "..\..\hdf5-1.8.18\src\H5Dio.c", line 173, in H5Dread
can't read data
File "..\..\hdf5-1.8.18\src\H5Dio.c", line 543, in H5D__read
can't initialize …Run Code Online (Sandbox Code Playgroud) Pandas中的相同任务可以轻松完成
import pandas as pd
df = pd.DataFrame({"lists":[[i, i+1] for i in range(10)]})
df[['left','right']] = pd.DataFrame([x for x in df.lists])
Run Code Online (Sandbox Code Playgroud)
但我无法弄清楚如何做一些类似的东西 dask.dataframe
更新
到目前为止,我发现了这个解决方法
ddf = dd.from_pandas(df, npartitions=2)
ddf["left"] = ddf.apply(lambda x: x["lists"][0], axis=1, meta=pd.Series())
ddf["right"] = ddf.apply(lambda x: x["lists"][1], axis=1, meta=pd.Series())
Run Code Online (Sandbox Code Playgroud)
我想知道是否还有另一种方法可以进行.
我有一个 dask 数据框,其中索引是一个如下所示的字符串:
12/09/2016 00:00;32.0046;-106.259
12/09/2016 00:00;32.0201;-108.838
12/09/2016 00:00;32.0224;-106.004
Run Code Online (Sandbox Code Playgroud)
(它基本上是一个编码日期时间的字符串;纬度;行的经度)
我想在 dask 上下文中将其拆分为代表每个字段的各个列。
我可以用熊猫数据框来做到这一点:
df['date'], df['Lat'], df['Lon'] = df.index.str.split(';', 2).str
Run Code Online (Sandbox Code Playgroud)
但是对于我尝试过的几次尝试,这在 dask 中不起作用。如果我直接将 df 替换为 dask df,则会出现错误:
'Index' object has no attribute 'str'
Run Code Online (Sandbox Code Playgroud)
如果我使用列名而不是索引作为:
forecastDf['date'], forecastDf['Lat'], forecastDf['Lon'] = forecastDf['dateLocation'].str.split(';', 2).str
Run Code Online (Sandbox Code Playgroud)
我收到错误:
TypeError: 'StringAccessor' object is not iterable
Run Code Online (Sandbox Code Playgroud)
这是一个在 Pandas 中工作的可运行示例
import pandas as pd
df = pd.DataFrame()
df['dateLocation'] = ['12/09/2016 00:00;32.0046;-106.259','12/09/2016 00:00;32.0201;-108.838','12/09/2016 00:00;32.0224;-106.004']
df = df.set_index('dateLocation')
df['date'], df['Lat'], df['Lon'] = df.index.str.split(';', 2).str
df.head()
Run Code Online (Sandbox Code Playgroud)
这是我直接将其转换为 dask 时得到的错误
import dask.dataframe as dd …Run Code Online (Sandbox Code Playgroud) 我正在尝试做的是快速复制熊猫的值计数+ idxmax函数,因为我有很多数据。这是一个示例数据框:
partner_num cust_id item_id revw_ratg_num revw_dt item_qty
0 100 01 5 05/30/2000 0
0 100 03 5 05/30/2000 0
0 100 02 5 05/30/2000 0
1 200 13 4 04/01/2000 0
1 200 14 5 04/01/2000 1
2 200 22 2 04/01/2000 1
3 200 37 3 04/01/2000 1
9 300 92 1 03/24/2000 1
9 300 93 1 03/24/2000 1
9 300 94 1 03/24/2000 0
9 300 99 1 03/24/2000 0
6 300 91 2 03/24/2000 0 …Run Code Online (Sandbox Code Playgroud) 我正在使用 dask 或 joblib 将一些串行处理的 python 作业转换为多处理。可悲的是,我需要在 Windows 上工作。
当从 IPython 内运行或从命令行使用 python 调用 py 文件时,一切运行正常。
使用 cython 编译可执行文件时,它不再正常运行:越来越多的进程(无限制且大于请求的进程数)开始启动并阻塞我的系统。
它以某种方式感觉像多处理炸弹- 但当然我曾经if __name__=="__main__:"拥有控制块 - 通过在命令行中从 python 调用中正常运行获得批准。
我的 cython 调用是cython --embed --verbose --annotate THECODE.PY,我正在编译gcc -time -municode -DMS_WIN64 -mthreads -Wall -O -I"PATH_TO_\include" -L"PATH_TO_\libs" THECODE.c -lpython36 -o THECODE生成一个 windows 可执行文件THECODE.exe。
其他(单处理)代码运行良好。
dask 和 joblib 的问题似乎相同(这可能意味着 dask 的工作方式类似于或基于 joblib)。
有什么建议?
对于那些对mcve感兴趣的人:只需从Multiprocessing Bomb 中获取第一个代码并使用上面的 cython 命令编译它就会导致一个可执行文件炸毁你的系统。(我刚试过:-))
通过在代码示例中添加一行以显示以下内容,我发现了一些有趣的东西__name__: …
通常,当使用large时dask.DataFrame,仅获取几行以测试所有后续操作将很有用。
当前,根据“ 切片Dask数据帧”,此功能不受支持。
head来实现相同的功能(因为支持该命令),但是它返回一个常规的熊猫DataFrame。df[:1000]执行,但是生成的输出与您从Pandas期望的输出不同。有没有办法从a抓取前1000行dask.DataFrame?
类似的未回答问题:逐行处理 Dask 数据帧
我正在处理数百万行长的数据帧,所以现在我试图并行执行所有数据帧操作。我需要转换为 Dask 的一个这样的操作是:
for row in df.itertuples():
ratio = row.ratio
tmpratio = row.tmpratio
tmplabel = row.tmplabel
if tmpratio > ratio:
df.loc[row.Index,'ratio'] = tmpratio
df.loc[row.Index,'label'] = tmplabel
Run Code Online (Sandbox Code Playgroud)
在 Dask 中按索引设置值或有条件地在行中设置值的合适方法是什么?由于.loc不支持DASK项任务,似乎没有成为set_value,at[]或者iat[]在DASK无论是。
我尝试将map_partitions与assign一起使用,但我没有看到在行级别执行条件分配的任何能力。
我有两个数据框,我需要比较行的完整组合并返回符合条件的组合.事实证明,对于我们的小型集群来说Spark(使用交叉连接)过于密集,所以我正在尝试这种方法,最终会看到是否Dask可以改进它.
如果表A和B是
a=pd.DataFrame(np.array([[1,2,3],[4,5,6]]), columns=['a','b','c'])
b=pd.DataFrame(np.array([[4,7,4],[6,5,1],[8,6,0]]), columns=['d','e','f'])
Run Code Online (Sandbox Code Playgroud)
然后所有组合看起来像这样,其中计算AD.假设我只想保留AD> = - 3的行
A B C D E F A-D
1 2 3 4 7 4 -3
1 2 3 6 5 1 -5
1 2 3 8 6 0 -7
4 5 6 4 7 4 0
4 5 6 6 5 1 -2
4 5 6 8 6 0 -4
Run Code Online (Sandbox Code Playgroud)
我尝试使用apply执行此操作,但似乎我无法返回dataframe函数的多行(该函数创建'A'的单行和'B'的整个表的所有组合并返回行符合标准.
这是我测试的功能:
def return_prox_branches(a, B, cutthresh):
aa=a['a']-B['d']
keep_B = B.copy().loc[(aa.values >= cutthresh),:]
keep_B['A']=a['a']
keep_B['B']=a['b'] …Run Code Online (Sandbox Code Playgroud) 我在已知限制部分的Dask文档中阅读了以下内容:
它[Dask]不具有容错能力.任何工人的失败都可能导致系统崩溃.
如果出现错误,它不会优雅地失败
但是在与Spark的比较中我没有看到任何容错的 提及.这些是目前"你可能选择Spark的原因":
- 您更喜欢Scala或SQL语言
- 您主要拥有JVM基础架构和遗留系统
- 您需要一个既定且值得信赖的业务解决方案
- 您主要使用轻量级机器学习进行业务分析
- 您想要一个多功能的解决方案
我的问题:
dask ×10
python ×6
dataframe ×4
pandas ×4
apache-spark ×1
bigdata ×1
cython ×1
distributed ×1
h5py ×1
hdf5 ×1
joblib ×1
progress-bar ×1
pyspark ×1
python-2.7 ×1
windows ×1