我用这样的pip 安装了Dask:
pip install dask
Run Code Online (Sandbox Code Playgroud)
当我尝试做时,import dask.dataframe as dd我收到以下错误消息:
>>> import dask.dataframe as dd
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/path/to/venv/lib/python2.7/site-packages/dask/__init__.py", line 5, in <module>
from .async import get_sync as get
File "/path/to/venv/lib/python2.7/site-packages/dask/async.py", line 120, in <module>
from toolz import identity
ImportError: No module named toolz
No module named toolz
Run Code Online (Sandbox Code Playgroud)
我注意到文档说明了
pip install dask:仅安装dask,它仅依赖于标准库.如果您只需要任务计划程序,这是合适的.
所以我很困惑为什么这不起作用.
在集群上设置worker时,我对dask和dask.distributed中使用的不同术语感到有些困惑.
我遇到的术语是:线程,进程,处理器,节点,工作者,调度程序.
我的问题是如何设置每个的数量,以及其中任何一个之间是否存在严格或建议的关系.例如:
还有其他建议吗?
我有一个非常大的netCDF文件,我正在使用python中的netCDF4阅读
我无法一次读取此文件,因为它的尺寸(1200 x 720 x 1440)太大,整个文件不能同时在内存中.第一维代表时间,下一个分别代表纬度和经度.
import netCDF4
nc_file = netCDF4.Dataset(path_file, 'r', format='NETCDF4')
for yr in years:
nc_file.variables[variable_name][int(yr), :, :]
Run Code Online (Sandbox Code Playgroud)
然而,一次阅读一年是非常缓慢的.如何加快以下用例的速度?
- 编辑
chunksize是1
我可以阅读一系列年份:nc_file.variables [variable_name] [0:100,:,]
有几个用例:
多年来:
numpy.ma.sum(nc_file.variables[variable_name][int(yr), :, :])
Run Code Online (Sandbox Code Playgroud)# Multiply each year by a 2D array of shape (720 x 1440)
for yr in years:
numpy.ma.sum(nc_file.variables[variable_name][int(yr), :, :] * arr_2d)
Run Code Online (Sandbox Code Playgroud)
# Add 2 netcdf files together
for yr in years:
numpy.ma.sum(nc_file.variables[variable_name][int(yr), :, :] +
nc_file2.variables[variable_name][int(yr), :, :])
Run Code Online (Sandbox Code Playgroud) 我正在将大文本文件转换为hdf存储,希望能够更快地访问数据.转换工作正常,但是从csv文件读取并不是并行完成的.它真的很慢(对于SSD上的1GB文本文件需要大约30分钟,所以我的猜测是它不受IO限制).
有没有办法让它在parralel的多个线程中读取?Sice可能很重要,我目前被迫在Windows下运行 - 以防万一有所不同.
from dask import dataframe as ddf
df = ddf.read_csv("data/Measurements*.csv",
sep=';',
parse_dates=["DATETIME"],
blocksize=1000000,
)
df.categorize([ 'Type',
'Condition',
])
df.to_hdf("data/data.hdf", "Measurements", 'w')
Run Code Online (Sandbox Code Playgroud) 对于DASK文档有关重新分割,以减少开销谈到这里.
然而,它们似乎表明您需要预先知道数据帧的外观(即预期数据的1/100).
是否有一种很好的方法可以在不做假设的情况下明智地进行重新分配?目前我只是重新分配npartitions = ncores * magic_number,并设置强制True扩展分区,如果需要.这种尺寸适用于所有方法,但由于我的数据集大小不同,因此绝对不是最理想的.
数据是时间序列数据,但不幸的是不是定期的,我过去曾经按时间频率重新分配,但由于数据的不规则性(这有时几分钟没有数千秒),这将是次优的.
Dask 的根本区别和主要用例是什么?莫丁 | 数据表
我检查了每个库的文档,它们似乎都为熊猫限制提供了“类似”的解决方案
我使用分布式,一个允许并行计算的框架.在这里,我的主要用例是NumPy.当我包含依赖的NumPy代码时np.linalg,我收到一个错误OMP_NUM_THREADS,它与OpenMP库有关.
一个最小的例子:
from distributed import Executor
import numpy as np
e = Executor('144.92.142.192:8786')
def f(x, m=200, n=1000):
A = np.random.randn(m, n)
x = np.random.randn(n)
# return np.fft.fft(x) # tested; no errors
# return np.random.randn(n) # tested; no errors
return A.dot(y).sum() # tested; throws error below
s = [e.submit(f, x) for x in [1, 2, 3, 4]]
s = e.gather(s)
Run Code Online (Sandbox Code Playgroud)
当我使用linalg测试进行测试时,e.gather失败,因为每个作业都会抛出以下错误:
OMP: Error #34: System unable to allocate necessary resources for …Run Code Online (Sandbox Code Playgroud) 我正在解析制表符分隔的数据以创建表格数据,我想将其存储在HDF5中.
我的问题是我必须将数据聚合成一种格式,然后转储到HDF5.这是大约1 TB大小的数据,所以我自然无法将其放入RAM中.Dask可能是完成此任务的最佳方式.
如果我使用解析我的数据来适应一个pandas数据帧,我会这样做:
import pandas as pd
import csv
csv_columns = ["COL1", "COL2", "COL3", "COL4",..., "COL55"]
readcsvfile = csv.reader(csvfile)
total_df = pd.DataFrame() # create empty pandas DataFrame
for i, line in readcsvfile:
# parse create dictionary of key:value pairs by table field:value, "dictionary_line"
# save dictionary as pandas dataframe
df = pd.DataFrame(dictionary_line, index=[i]) # one line tabular data
total_df = pd.concat([total_df, df]) # creates one big dataframe
Run Code Online (Sandbox Code Playgroud)
使用dask执行相同的任务,用户应该尝试这样的事情:
import pandas as pd
import csv
import dask.dataframe as dd
import …Run Code Online (Sandbox Code Playgroud) 在将dask升级到版本1.15.0之后,我的日志记录停止了工作.
我已经使用logging.config.dictConfig初始化python日志工具,以前这些设置传播给所有工作者.但升级后它不再起作用了.
如果我在每个日志调用之前对每个工作人员进行dictConfig工作,但这不是一个合适的解决方案.
所以问题是如何在我的计算图开始执行之前初始化每个worker的日志记录,并且每个worker只执行一次?
更新:
这个hack用于一个虚拟示例,但对我的系统没有任何影响:
def init_logging():
# logging initializing happens here
...
client = distributed.Client()
client.map(lambda _: init_logging, client.ncores())
Run Code Online (Sandbox Code Playgroud)
更新2:
在深入了解文档后,解决了这个问题:
client.run(init_logging)
Run Code Online (Sandbox Code Playgroud)
所以现在的问题是:这是解决这个问题的正确方法吗?
我想在dask数据帧的单个列上进行频率计数.代码可以工作,但我得到一个没有定义的warning抱怨meta.如果我试图定义meta我得到一个错误AttributeError: 'DataFrame' object has no attribute 'name'.对于这个特定的用例,它看起来不像我需要定义,meta但我想知道如何做以备将来参考.
虚拟数据帧和列频率
import pandas as pd
from dask import dataframe as dd
df = pd.DataFrame([['Sam', 'Alex', 'David', 'Sarah', 'Alice', 'Sam', 'Anna'],
['Sam', 'David', 'David', 'Alice', 'Sam', 'Alice', 'Sam'],
[12, 10, 15, 23, 18, 20, 26]],
index=['Column A', 'Column B', 'Column C']).T
dask_df = dd.from_pandas(df)
Run Code Online (Sandbox Code Playgroud)
In [39]: dask_df.head()
Out[39]:
Column A Column B Column C
0 Sam Sam 12
1 Alex David 10 …Run Code Online (Sandbox Code Playgroud) dask ×10
python ×7
pandas ×4
numpy ×2
bigdata ×1
csv ×1
dataframe ×1
hdf5 ×1
importerror ×1
installation ×1
modin ×1
netcdf ×1
optimization ×1
pip ×1
pytables ×1