我有一个从 csv 文件创建的 dask 数据框并len(daskdf)返回 18000 但是当ddSample = daskdf.sample(2000)我收到错误时
ValueError: Cannot take a larger sample than population when 'replace=False'
Run Code Online (Sandbox Code Playgroud)
如果数据框大于样本大小,我可以在不替换的情况下进行采样吗?
我想在Jupyternotebook上看到一个进度条,当我使用Dask运行计算任务时,我正在计算一个大的csv文件+ 4GB的"id"列的所有值,所以任何想法?
import dask.dataframe as dd
df = dd.read_csv('data/train.csv')
df.id.count().compute()
Run Code Online (Sandbox Code Playgroud) 我正在尝试将我的蒙特卡罗模拟转换numpy为dask,因为有时数组太大,无法适应内存.因此,我在云中设置了一组计算机:我的dask集群由24个内核和94 GB内存组成.我为这个问题准备了我的代码的简化版本.
我的原始numpy代码如下所示:
def numpy_way(sim_count, sim_days, hist_days):
historical_data = np.random.normal(111.51, 10, hist_days)
historical_multidim = np.empty(shape=(1, 1, sim_count, hist_days))
historical_multidim[:, :, :, :] = historical_data
random_days_panel = np.random.randint(low=1,
high=hist_days,
size=(1, 1, sim_count, sim_days))
future_panel = historical_multidim[np.arange(1)[:, np.newaxis, np.newaxis, np.newaxis],
np.arange(1)[:, np.newaxis, np.newaxis],
np.arange(sim_count)[:, np.newaxis],
random_days_panel]
return future_panel.shape
Run Code Online (Sandbox Code Playgroud)
注意:我只是在这里返回numpy数组的形状(但是因为它是numpy,所以future_panel的元素在内存中是有意义的.
关于功能的一些话:
historical_data- 这只是1Dhistorical_multidim).这里不使用前两个维度(但它们在我的最终应用程序中)
forecasted未来的天数random_days_panel- 只是一个ndarray随机选择的日子.所以shape这个数组的最后一个是:1,1,sim_count,sim_days(在上一点解释)future_panel是ndarray随机选取的值historical_multidim.即从具有预期形状的历史数据生成的数组(1,1,sim_count,sim_days)现在,问题是,其中一些步骤没有在dask中实现:
historical_multidim[:, …我是 dask 的新手,我发现拥有一个可以轻松实现并行化的模块真是太好了。我正在做一个项目,我可以在一台机器上并行化一个循环,正如你在这里看到的。但是,我想转移到dask.distributed. 我对上面的类应用了以下更改:
diff --git a/mlchem/fingerprints/gaussian.py b/mlchem/fingerprints/gaussian.py
index ce6a72b..89f8638 100644
--- a/mlchem/fingerprints/gaussian.py
+++ b/mlchem/fingerprints/gaussian.py
@@ -6,7 +6,7 @@ from sklearn.externals import joblib
from .cutoff import Cosine
from collections import OrderedDict
import dask
-import dask.multiprocessing
+from dask.distributed import Client
import time
@@ -141,13 +141,14 @@ class Gaussian(object):
for image in images.items():
computations.append(self.fingerprints_per_image(image))
+ client = Client()
if self.scaler is None:
- feature_space = dask.compute(*computations, scheduler='processes',
+ feature_space = dask.compute(*computations, scheduler='distributed',
num_workers=self.cores)
feature_space = OrderedDict(feature_space)
else:
stacked_features …Run Code Online (Sandbox Code Playgroud) 文档提供了很好的示例,如何提供元数据.但是,当我为数据帧选择正确的dtypes时,我仍然感到不确定.
meta={'x': int 'y': float,
'z': float}代替meta={'x': 'i8', 'y': 'f8', 'z': 'f8'}吗?我想提交具有大(千兆字节)参数的Dask函数.做这个的最好方式是什么?我想用不同的(小)参数多次运行这个函数.
这使用concurrent.futures接口.我们可以轻松地使用dask.delayed接口.
x = np.random.random(size=100000000) # 800MB array
params = list(range(100)) # 100 small parameters
def f(x, param):
pass
from dask.distributed import Client
c = Client()
futures = [c.submit(f, x, param) for param in params]
Run Code Online (Sandbox Code Playgroud)
但这比我预期的要慢或导致内存错误.
我有一个由100,000多行组成的数据帧,每行有100,000列,总共10,000,000,000个浮点值.
我已经成功地在一个csv(制表符分隔的)文件中读取它们,并且我成功地将它们读取到具有250GB RAM的50核Xeon机器并尝试将其写为.parq目录,如下所示:
浮动huge.csv内容保存为字符串,为125GB.
import dask.dataframe as dd
filename = 'huge.csv'
df = dd.read_csv(filename, delimiter='\t', sample=500000000)
df.to_parquet('huge.parq')
Run Code Online (Sandbox Code Playgroud)
它已经写了huge.parq近一个星期,目录是14GB,看起来保存的过程.to_parquet不会很快停止.
并且free -mh显示仍有可用内存但是保存.parq目录所花费的时间非常慢:
$ free -mh
total used free shared buff/cache available
Mem: 251G 98G 52G 10M 101G 152G
Swap: 238G 0B 238G
Run Code Online (Sandbox Code Playgroud)
问题是:
考虑到数据帧和机器的大小,将dask数据帧保存到镶木地板文件是否可行?
保存庞大的数据帧是否正常dask并fastparquet花费这么长时间?
有没有办法估计保存镶木地板文件所需的时间?
对于我的应用程序,我需要读取每个15 M行的多个文件,将它们存储在DataFrame中,然后以HDFS5格式保存DataFrame。
我已经尝试了不同的方法,特别是punks.read_csv(具有chunksize和dtype规范)和dask.dataframe。它们都需要大约90秒才能处理1个文件,因此我想知道是否有一种方法可以按所述方式有效处理这些文件。在下面的代码中,我展示了一些我已经完成的测试代码。
import pandas as pd
import dask.dataframe as dd
import numpy as np
import re
# First approach
store = pd.HDFStore('files_DFs.h5')
chunk_size = 1e6
df_chunk = pd.read_csv(file,
sep="\t",
chunksize=chunk_size,
usecols=['a', 'b'],
converters={"a": lambda x: np.float32(re.sub(r"[^\d.]", "", x)),\
"b": lambda x: np.float32(re.sub(r"[^\d.]", "", x))},
skiprows=15
)
chunk_list = []
for chunk in df_chunk:
chunk_list.append(chunk)
df = pd.concat(chunk_list, ignore_index=True)
store[dfname] = df
store.close()
# Second approach
df = dd.read_csv(
file,
sep="\t",
usecols=['a', 'b'],
converters={"a": lambda x: np.float32(re.sub(r"[^\d.]", "", x)),\
"b": …Run Code Online (Sandbox Code Playgroud) 我用这样的pip 安装了Dask:
pip install dask
Run Code Online (Sandbox Code Playgroud)
当我尝试做时,import dask.dataframe as dd我收到以下错误消息:
>>> import dask.dataframe as dd
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/path/to/venv/lib/python2.7/site-packages/dask/__init__.py", line 5, in <module>
from .async import get_sync as get
File "/path/to/venv/lib/python2.7/site-packages/dask/async.py", line 120, in <module>
from toolz import identity
ImportError: No module named toolz
No module named toolz
Run Code Online (Sandbox Code Playgroud)
我注意到文档说明了
pip install dask:仅安装dask,它仅依赖于标准库.如果您只需要任务计划程序,这是合适的.
所以我很困惑为什么这不起作用.
我正在使用dask(替换SQL查询)从s3读取一些gzip压缩数据.但是,看起来有一些数据文件的缓存,或者在系统内存中保留的解压缩文件.NB这应该是可运行的,这里的测试数据是在公共s3存储桶中的pandas测试套件中使用的.
import dask.dataframe as dd
import pandas as pd
import psutil as ps
import os
#for easier vis
mb = 1048576
def mytestfunc(file):
process = ps.Process(os.getpid())
print('initial memory: {0}'.format(process.memory_info().rss/mb))
data = dd.read_csv(file, compression = 'gzip', blocksize = None, storage_options = {'anon':True})
print('dask plan memory: {0}'.format(process.memory_info().rss/mb))
data = data.compute()
print('data in memory: {0}'.format(process.memory_info().rss/mb))
print('data frame usage: {0}'.format(data.memory_usage(deep=True).sum()/mb))
return data
process = ps.Process(os.getpid())
print('before function call: {0}'.format(process.memory_info().rss/mb))
out = mytestfunc('s3://pandas-test/large_random.csv.gz')
print('After function call: {0}'.format(process.memory_info().rss/mb))
# out = mytestfunc('s3://pandas-test/tips.csv.gz')
# print('After smaller function …Run Code Online (Sandbox Code Playgroud) dask ×10
python ×9
pandas ×3
dataframe ×2
amazon-s3 ×1
csv ×1
fastparquet ×1
importerror ×1
installation ×1
numpy ×1
parquet ×1
pip ×1
python-3.x ×1