标签: dask

使用dask分发时出现OMP_NUM_THREADS错误

我使用分布式,一个允许并行计算的框架.在这里,我的主要用例是NumPy.当我包含依赖的NumPy代码时np.linalg,我收到一个错误OMP_NUM_THREADS,它与OpenMP库有关.

一个最小的例子:

from distributed import Executor
import numpy as np
e = Executor('144.92.142.192:8786')

def f(x, m=200, n=1000):
    A = np.random.randn(m, n)
    x = np.random.randn(n)
    #  return np.fft.fft(x)  # tested; no errors
    #  return np.random.randn(n)  # tested; no errors
    return A.dot(y).sum()  # tested; throws error below

s = [e.submit(f, x) for x in [1, 2, 3, 4]]
s = e.gather(s)
Run Code Online (Sandbox Code Playgroud)

当我使用linalg测试进行测试时,e.gather失败,因为每个作业都会抛出以下错误:

OMP: Error #34: System unable to allocate necessary resources for …
Run Code Online (Sandbox Code Playgroud)

python numpy cluster-computing dask

12
推荐指数
1
解决办法
4294
查看次数

如何将多个pandas数据帧连接到一个大于内存的dask数据帧?

我正在解析制表符分隔的数据以创建表格数据,我想将其存储在HDF5中.

我的问题是我必须将数据聚合成一种格式,然后转储到HDF5.这是大约1 TB大小的数据,所以我自然无法将其放入RAM中.Dask可能是完成此任务的最佳方式.

如果我使用解析我的数据来适应一个pandas数据帧,我会这样做:

import pandas as pd
import csv   

csv_columns = ["COL1", "COL2", "COL3", "COL4",..., "COL55"]
readcsvfile = csv.reader(csvfile)

total_df = pd.DataFrame()    # create empty pandas DataFrame
for i, line in readcsvfile:
    # parse create dictionary of key:value pairs by table field:value, "dictionary_line"
    # save dictionary as pandas dataframe
    df = pd.DataFrame(dictionary_line, index=[i])  # one line tabular data 
    total_df = pd.concat([total_df, df])   # creates one big dataframe
Run Code Online (Sandbox Code Playgroud)

使用dask执行相同的任务,用户应该尝试这样的事情:

import pandas as pd
import csv 
import dask.dataframe as dd
import …
Run Code Online (Sandbox Code Playgroud)

hdf5 bigdata pytables pandas dask

12
推荐指数
1
解决办法
2412
查看次数

fastparquet和pyarrow之间的比较?

经过一番搜索我没有找到一个彻底的比较fastparquetpyarrow

我找到了这篇博客文章(速度的基本比较)。

还有一个github 讨论,声称使用github 创建的文件fastparquet不支持AWS-athena(顺便说一句,情况仍然如此吗?)

什么时候/为什么要在另一个上使用?主要优点和缺点是什么?


我的特定用例是处理数据,方法是将数据dask写入s3,然后使用AWS-athena进行读取/分析。

python parquet dask fastparquet pyarrow

12
推荐指数
4
解决办法
7916
查看次数

dask dataframe如何将列转换为to_datetime

我正在尝试将我的数据帧的一列转换为datetime.在这里讨论之后https://github.com/dask/dask/issues/863我尝试了以下代码:

import dask.dataframe as dd
df['time'].map_partitions(pd.to_datetime, columns='time').compute()
Run Code Online (Sandbox Code Playgroud)

但我收到以下错误消息

ValueError: Metadata inference failed, please provide `meta` keyword
Run Code Online (Sandbox Code Playgroud)

究竟应该把什么放在元下?我应该在df中或仅在'time'列中放置所有列的字典吗?我应该放什么类型的?我尝试过dtype和datetime64,但到目前为止它们都没有.

谢谢你,我感谢你的指导,

更新

我将在这里包含新的错误消息:

1)使用时间戳

df['trd_exctn_dt'].map_partitions(pd.Timestamp).compute()

TypeError: Cannot convert input to Timestamp
Run Code Online (Sandbox Code Playgroud)

2)使用datetime和meta

meta = ('time', pd.Timestamp)
df['time'].map_partitions(pd.to_datetime,meta=meta).compute()
TypeError: to_datetime() got an unexpected keyword argument 'meta'
Run Code Online (Sandbox Code Playgroud)

3)只使用日期时间:陷入2%

    In [14]: df['trd_exctn_dt'].map_partitions(pd.to_datetime).compute()
[                                        ] | 2% Completed |  2min 20.3s
Run Code Online (Sandbox Code Playgroud)

此外,我希望能够在日期中指定格式,就像我在pandas中所做的那样:

pd.to_datetime(df['time'], format = '%m%d%Y'
Run Code Online (Sandbox Code Playgroud)

更新2

更新到Dask 0.11后,我不再遇到meta关键字问题.不过,我无法在2GB数据帧上超过2%.

df['trd_exctn_dt'].map_partitions(pd.to_datetime, meta=meta).compute()
    [                                        ] | 2% Completed |  30min 45.7s
Run Code Online (Sandbox Code Playgroud)

更新3

这样做得更好:

def parse_dates(df):
  return pd.to_datetime(df['time'], …
Run Code Online (Sandbox Code Playgroud)

python pandas dask

11
推荐指数
4
解决办法
1万
查看次数

如何在单个线程中运行dask.distributed集群?

如何在单个线程中运行完整的Dask.distributed集群?我想用它来进行调试或分析.

注意:这是一个经常被问到的问题.我在这里将问题和答案添加到Stack Overflow中,以便将来重用.

python dask

11
推荐指数
1
解决办法
860
查看次数

使用Python在Parquet中嵌套数据

我有一个文件,每行有一个JSON.这是一个示例:

{
    "product": {
        "id": "abcdef",
        "price": 19.99,
        "specs": {
            "voltage": "110v",
            "color": "white"
        }
    },
    "user": "Daniel Severo"
}
Run Code Online (Sandbox Code Playgroud)

我想用以下列创建一个镶木地板文件:

product.id, product.price, product.specs.voltage, product.specs.color, user
Run Code Online (Sandbox Code Playgroud)

我知道镶木地板使用Dremel算法进行嵌套编码,但我无法在python中使用它(不知道为什么).

我是一个沉重的熊猫和dask用户,所以我试图构建的管道是json data -> dask -> parquet -> pandas,虽然如果有人有一个简单的例子,使用Python在镶木地板中创建和读取这些嵌套编码我认为这样就足够了:D

编辑

所以,在挖掘PR之后我发现了这个:https://github.com/dask/fastparquet/pull/177

这基本上就是我想要做的.虽然,我仍然无法让它一直运作.我怎么告诉dask/fastparquet我的product列是嵌套的?

python json parquet dask

11
推荐指数
1
解决办法
6558
查看次数

保存pd.DataFrame时如何强制镶木地板dtypes?

有没有办法强制镶木地板文件将pd.DataFrame列编码为给定类型,即使该列的所有值都为空?镶木地板在其模式中自动分配"null"的事实阻止我将许多文件加载到单个文件中dask.dataframe.

试图使用pandas列投射df.column_name = df.column_name.astype(sometype)不起作用.

我为什么这么问

我想将许多镶木地板文件加载到一个单独的dask.dataframe.所有文件都是pd.DataFrame使用多个实例生成的df.to_parquet(filename).所有数据帧都具有相同的列,但对于某些列,给定列可能只包含空值.当试图将所有文件加载到dask.dataframe(使用时df = dd.read_parquet('*.parquet'),我得到以下错误:

Schema in filename.parquet was different.
id: int64
text: string
[...]
some_column: double

vs

id: int64
text: string
[...]
some_column: null
Run Code Online (Sandbox Code Playgroud)

重现我的问题的步骤

import pandas as pd
import dask.dataframe as dd
a = pd.DataFrame(['1', '1'], columns=('value',))
b = pd.DataFrame([None, None], columns=('value',))
a.to_parquet('a.parquet')
b.to_parquet('b.parquet')
df = dd.read_parquet('*.parquet')  # Reads a and b
Run Code Online (Sandbox Code Playgroud)

这给了我以下内容:

ValueError: Schema in path/to/b.parquet was different. …
Run Code Online (Sandbox Code Playgroud)

python pandas parquet dask pyarrow

11
推荐指数
1
解决办法
2517
查看次数

R 相当于 Python 的 dask

R 中有与 Python 等效的包dask吗?专门用于在单台机器上的大于内存的数据集上运行机器学习算法。

链接到 Python 的Dask页面:https : //dask.pydata.org/en/latest/

来自 Dask 网站:

Dask 本地扩展 Python

Dask 为分析提供高级并行性,为您喜爱的工具提供大规模性能

Dask 的调度程序可扩展到千节点集群,其算法已在世界上一些最大的超级计算机上进行了测试。

但是你不需要一个庞大的集群来开始。Dask 附带了专为在个人机器上使用而设计的调度程序。今天,许多人使用 Dask 在他们的笔记本电脑上扩展计算,使用多个内核进行计算,并使用磁盘进行多余的存储。

python r dask

11
推荐指数
2
解决办法
3375
查看次数

连接dask数据帧和pandas数据帧

我有一个dask数据帧(df),大约有2.5亿行(来自一个10Gb的CSV文件).我有另外ndf25,000行的pandas dataframe().我想通过重复每个项目10,000次,将第一列pandas数据帧添加到dask数据帧.

这是我试过的代码.我已将问题缩小到更小的尺寸.

import dask.dataframe as dd
import pandas as pd
import numpy as np

pd.DataFrame(np.random.rand(25000, 2)).to_csv("tempfile.csv")
df = dd.read_csv("tempfile.csv")
ndf = pd.DataFrame(np.random.randint(1000, 3500, size=2500))
df['Node'] = np.repeat(ndf[0], 10)
Run Code Online (Sandbox Code Playgroud)

使用此代码,我最终得到一个错误.

ValueError:并非所有分区都已知,无法对齐分区.请用于set_index设置索引.

我可以执行a reset_index()后跟a set_index()df.known_divisions True生成dask数据帧.但这是一项耗时的操作.有没有更好的方法来做我想做的事情?我可以用熊猫本身做到这一点吗?

最终目标是从ndf任何相应行的位置查找df与某些条件匹配的行.

python dataframe pandas dask

11
推荐指数
1
解决办法
518
查看次数

如何指定默认dask调度程序的线程/进程数

有没有办法限制默认线程调度程序使用的内核数(使用dask数据帧时默认)?

使用compute,您可以使用以下命令指定它:

df.compute(get=dask.threaded.get, num_workers=20)
Run Code Online (Sandbox Code Playgroud)

但我想知道是否有办法将其设置为默认值,因此您不需要为每次compute调用指定此项?

例如,在小型集群(例如64个核心)的情况下会很有趣,但是与其他人共享(没有作业系统),并且我不希望在使用dask开始计算时占用所有核心.

python dask

10
推荐指数
1
解决办法
3397
查看次数