标签: dask

从 len 18000 的 Dask 数据帧中采样 n= 2000 会产生错误当“replace=False”时不能采用比总体更大的样本

我有一个从 csv 文件创建的 dask 数据框并len(daskdf)返回 18000 但是当ddSample = daskdf.sample(2000)我收到错误时

ValueError: Cannot take a larger sample than population when 'replace=False'
Run Code Online (Sandbox Code Playgroud)

如果数据框大于样本大小,我可以在不替换的情况下进行采样吗?

python dask

17
推荐指数
1
解决办法
2万
查看次数

如何查看Dask Compute任务的进度?

我想在Jupyternotebook上看到一个进度条,当我使用Dask运行计算任务时,我正在计算一个大的csv文件+ 4GB的"id"列的所有值,所以任何想法?

import dask.dataframe as dd

df = dd.read_csv('data/train.csv')
df.id.count().compute()
Run Code Online (Sandbox Code Playgroud)

distributed-computing python-3.x dask jupyter-notebook

17
推荐指数
1
解决办法
5549
查看次数

将numpy解决方案转换为dask(numpy索引在dask中不起作用)

我正在尝试将我的蒙特卡罗模拟转换numpydask,因为有时数组太大,无法适应内存.因此,我在云中设置了一组计算机:我的dask集群由24个内核和94 GB内存组成.我为这个问题准备了我的代码的简化版本.

我的原始numpy代码如下所示:

def numpy_way(sim_count, sim_days, hist_days):
   historical_data = np.random.normal(111.51, 10, hist_days)
   historical_multidim = np.empty(shape=(1, 1, sim_count, hist_days))
   historical_multidim[:, :, :, :] = historical_data


   random_days_panel = np.random.randint(low=1,
                                      high=hist_days,
                                      size=(1, 1, sim_count, sim_days))
   future_panel = historical_multidim[np.arange(1)[:, np.newaxis, np.newaxis, np.newaxis],
                                      np.arange(1)[:, np.newaxis, np.newaxis],
                                      np.arange(sim_count)[:, np.newaxis],
                                      random_days_panel]
   return future_panel.shape
Run Code Online (Sandbox Code Playgroud)

注意:我只是在这里返回numpy数组的形状(但是因为它是numpy,所以future_panel的元素在内存中是有意义的.

关于功能的一些话:

  • 我正在创建一个随机数组historical_data- 这只是1D
  • 然后将该数组"广播"为4D数组(historical_multidim).这里不使用前两个维度(但它们在我的最终应用程序中)
    • 第三维表示完成了多少次模拟
    • 第四维度是forecasted未来的天数
  • random_days_panel- 只是一个ndarray随机选择的日子.所以shape这个数组的最后一个是:1,1,sim_count,sim_days(在上一点解释)
  • future_panelndarray随机选取的值historical_multidim.即从具有预期形状的历史数据生成的数组(1,1,sim_count,sim_days)

现在,问题是,其中一些步骤没有在dask中实现:

  • historical_multidim[:, …

python numpy dask dask-distributed

16
推荐指数
1
解决办法
677
查看次数

在当前进程完成其引导阶段之前尝试启动一个新进程

我是 dask 的新手,我发现拥有一个可以轻松实现并行化的模块真是太好了。我正在做一个项目,我可以在一台机器上并行化一个循环,正如你在这里看到的。但是,我想转移到dask.distributed. 我对上面的类应用了以下更改:

diff --git a/mlchem/fingerprints/gaussian.py b/mlchem/fingerprints/gaussian.py
index ce6a72b..89f8638 100644
--- a/mlchem/fingerprints/gaussian.py
+++ b/mlchem/fingerprints/gaussian.py
@@ -6,7 +6,7 @@ from sklearn.externals import joblib
 from .cutoff import Cosine
 from collections import OrderedDict
 import dask
-import dask.multiprocessing
+from dask.distributed import Client
 import time


@@ -141,13 +141,14 @@ class Gaussian(object):
         for image in images.items():
             computations.append(self.fingerprints_per_image(image))

+        client = Client()
         if self.scaler is None:
-            feature_space = dask.compute(*computations, scheduler='processes',
+            feature_space = dask.compute(*computations, scheduler='distributed',
                                          num_workers=self.cores)
             feature_space = OrderedDict(feature_space)
         else:
             stacked_features …
Run Code Online (Sandbox Code Playgroud)

python dask dask-distributed

16
推荐指数
2
解决办法
1万
查看次数

如何为dask.dataframe指定元数据

文档提供了很好的示例,如何提供元数据.但是,当我为数据帧选择正确的dtypes时,我仍然感到不确定.

  • 我可以做一些meta={'x': int 'y': float, 'z': float}代替meta={'x': 'i8', 'y': 'f8', 'z': 'f8'}吗?
  • 有人可以提示我一个像'i8'这样的可能值列表吗?存在什么类型?
  • 如何指定包含任意对象的列?如何指定仅包含一个类的实例的列?

python pandas dask

15
推荐指数
2
解决办法
6402
查看次数

如何在Dask分发中有效地提交具有大参数的任务?

我想提交具有大(千兆字节)参数的Dask函数.做这个的最好方式是什么?我想用不同的(小)参数多次运行这个函数.

示例(坏)

这使用concurrent.futures接口.我们可以轻松地使用dask.delayed接口.

x = np.random.random(size=100000000)  # 800MB array
params = list(range(100))             # 100 small parameters

def f(x, param):
    pass

from dask.distributed import Client
c = Client()

futures = [c.submit(f, x, param) for param in params]
Run Code Online (Sandbox Code Playgroud)

但这比我预期的要慢或导致内存错误.

python dask

15
推荐指数
1
解决办法
3260
查看次数

是否可以将巨大的dask数据框保存到镶木地板中?

我有一个由100,000多行组成的数据帧,每行有100,000列,总共10,000,000,000个浮点值.

我已经成功地在一个csv(制表符分隔的)文件中读取它们,并且我成功地将它们读取到具有250GB RAM的50核Xeon机器并尝试将其写为.parq目录,如下所示:

浮动huge.csv内容保存为字符串,为125GB.

import dask.dataframe as dd
filename = 'huge.csv'
df = dd.read_csv(filename, delimiter='\t', sample=500000000)
df.to_parquet('huge.parq')
Run Code Online (Sandbox Code Playgroud)

它已经写了huge.parq近一个星期,目录是14GB,看起来保存的过程.to_parquet不会很快停止.

并且free -mh显示仍有可用内存但是保存.parq目录所花费的时间非常慢:

$ free -mh
              total        used        free      shared  buff/cache   available
Mem:           251G         98G         52G         10M        101G        152G
Swap:          238G          0B        238G
Run Code Online (Sandbox Code Playgroud)

问题是:

  • 考虑到数据帧和机器的大小,将dask数据帧保存到镶木地板文件是否可行?

  • 保存庞大的数据帧是否正常daskfastparquet花费这么长时间?

  • 有没有办法估计保存镶木地板文件所需的时间?

python dataframe parquet dask fastparquet

15
推荐指数
1
解决办法
2403
查看次数

在python中读取1500万行csv文件的有效方法

对于我的应用程序,我需要读取每个15 M行的多个文件,将它们存储在DataFrame中,然后以HDFS5格式保存DataFrame。

我已经尝试了不同的方法,特别是punks.read_csv(具有chunksize和dtype规范)和dask.dataframe。它们都需要大约90秒才能处理1个文件,因此我想知道是否有一种方法可以按所述方式有效处理这些文件。在下面的代码中,我展示了一些我已经完成的测试代码。

import pandas as pd
import dask.dataframe as dd
import numpy as np
import re 

# First approach
store = pd.HDFStore('files_DFs.h5')

chunk_size = 1e6

df_chunk = pd.read_csv(file,
                sep="\t",
                chunksize=chunk_size,
                usecols=['a', 'b'],
                converters={"a": lambda x: np.float32(re.sub(r"[^\d.]", "", x)),\
                            "b": lambda x: np.float32(re.sub(r"[^\d.]", "", x))},
                skiprows=15
           )              
chunk_list = [] 


for chunk in df_chunk:
      chunk_list.append(chunk)


df = pd.concat(chunk_list, ignore_index=True)

store[dfname] = df
store.close()

# Second approach

df = dd.read_csv(
        file,
        sep="\t",
        usecols=['a', 'b'],
        converters={"a": lambda x: np.float32(re.sub(r"[^\d.]", "", x)),\
                    "b": …
Run Code Online (Sandbox Code Playgroud)

python dataframe pandas dask

15
推荐指数
1
解决办法
912
查看次数

Dask的默认pip安装提供"ImportError:No module named toolz"

我用这样的pip 安装了Dask:

pip install dask
Run Code Online (Sandbox Code Playgroud)

当我尝试做时,import dask.dataframe as dd我收到以下错误消息:

>>> import dask.dataframe as dd
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/path/to/venv/lib/python2.7/site-packages/dask/__init__.py", line 5, in <module>
    from .async import get_sync as get
  File "/path/to/venv/lib/python2.7/site-packages/dask/async.py", line 120, in <module>
    from toolz import identity
ImportError: No module named toolz
No module named toolz
Run Code Online (Sandbox Code Playgroud)

我注意到文档说明了

pip install dask:仅安装dask,它仅依赖于标准库.如果您只需要任务计划程序,这是合适的.

所以我很困惑为什么这不起作用.

python installation pip importerror dask

14
推荐指数
1
解决办法
7286
查看次数

为什么来自s3的dask read_csv会保留这么多内存?

我正在使用dask(替换SQL查询)从s3读取一些gzip压缩数据.但是,看起来有一些数据文件的缓存,或者在系统内存中保留的解压缩文件.NB这应该是可运行的,这里的测试数据是在公共s3存储桶中的pandas测试套件中使用的.

import dask.dataframe as dd
import pandas as pd
import psutil as ps
import os

#for easier vis
mb = 1048576

def mytestfunc(file):
    process = ps.Process(os.getpid())

    print('initial memory: {0}'.format(process.memory_info().rss/mb))
    data = dd.read_csv(file, compression = 'gzip', blocksize = None, storage_options = {'anon':True})

    print('dask plan memory: {0}'.format(process.memory_info().rss/mb))

    data = data.compute()
    print('data in memory: {0}'.format(process.memory_info().rss/mb))
    print('data frame usage: {0}'.format(data.memory_usage(deep=True).sum()/mb))
    return data

process = ps.Process(os.getpid())
print('before function call: {0}'.format(process.memory_info().rss/mb))
out = mytestfunc('s3://pandas-test/large_random.csv.gz')
print('After function call: {0}'.format(process.memory_info().rss/mb))
# out = mytestfunc('s3://pandas-test/tips.csv.gz')
# print('After smaller function …
Run Code Online (Sandbox Code Playgroud)

python csv amazon-s3 pandas dask

14
推荐指数
1
解决办法
892
查看次数