标签: dask

如何解压延迟 dask 对象的数据帧?

我使用了.map_partitions延迟函数,得到的结果是一个数据帧,每行都有延迟结果。

有什么办法可以解压这些延迟的对象吗?难道我做错了什么?

谢谢。

python pandas dask

1
推荐指数
1
解决办法
95
查看次数

并行化嵌套大 `(15e4 * 15e4)` for 循环以获得成对矩阵

我正在尝试并行化以下代码,它为每一行创建一个成对的结果。如下所示。

def get_custom_value(i, j):
    first = df[df['id'] == i]
    second = df[df['id'] == j]

    return int(first['val_1']) * int(second['val_1']) +\
            int(first['val_2']) * int(second['val_2'])

df = pd.DataFrame(
    {
        'id' : range(4),
        'val_1' : [3, 4, 5, 1],
        'val_2' : [2, 3, 1, 1]
    }
)

n = df.shape[0]

result = []

for i in range(n):
    for j in range(i+1, n):
        temp_value = get_custom_value(i, j)
        result.append([i, j, temp_value])
        if len(result) > 1e5:
            # store it in a local file and reset the result …
Run Code Online (Sandbox Code Playgroud)

python parallel-processing multithreading dask

1
推荐指数
1
解决办法
239
查看次数

如何提高python读取多个csv文件的速度

这是我第一次创建处理包含大量数据的文件的代码,所以我有点卡在这里。

\n

我想要做的是读取路径列表,列出所有需要读取的 csv 文件,从每个文件中检索 HEAD 和 TAIL 并将其放入列表中。

\n

我总共有 621 个 csv 文件,每个文件由 5800 行和 251 列组成\n在此输入图像描述\n这是数据样本

\n
[LOGGING],RD81DL96_1,3,4,5,2,,,,\nLOG01,,,,,,,,,\nDATETIME,INDEX,SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0]\nTIME,INDEX,FF-1(\xef\xbc\x91A) ,FF-1(\xef\xbc\x91B) ,FF-1(\xef\xbc\x91C) ,FF-1(\xef\xbc\x92A),FF-2(\xef\xbc\x91A) ,FF-2(\xef\xbc\x91B) ,FF-2(\xef\xbc\x91C),FF-2(\xef\xbc\x92A)\n47:29.6,1,172,0,139,1258,0,0,400,0\n47:34.6,2,172,0,139,1258,0,0,400,0\n47:39.6,3,172,0,139,1258,0,0,400,0\n47:44.6,4,172,0,139,1263,0,0,400,0\n47:49.6,5,172,0,139,1263,0,0,450,0\n47:54.6,6,172,0,139,1263,0,0,450,0\n
Run Code Online (Sandbox Code Playgroud)\n

问题是,虽然读取所有文件花了大约 13 秒(老实说还是有点慢)

\n

但是当我添加一行追加代码时,这个过程花了很多时间才能完成,大约需要 4 分钟。

\n

以下是代码片段:

\n
# CsvList: [File Path, Change Date, File size, File Name]\nfor x, file in enumerate(CsvList):\n     timeColumn = [\'TIME\']\n     df = dd.read_csv(file[0], sep =\',\', skiprows = 3, encoding= \'CP932\', engine=\'python\', usecols=timeColumn)\n\n     # The process became long when this code is added\n     startEndList.append(list(df.head(1)) + list(df.tail(1))) …
Run Code Online (Sandbox Code Playgroud)

python csv dataframe pandas dask

1
推荐指数
1
解决办法
1965
查看次数

使用 dask 而不使用 > client = Client()

关于黄昏。

我想将 parquet 文件读取为 df 并运行 groupby 函数

我的问题是为什么我应该先运行这段代码?

from dask.distributed import Client, progress
client = Client()
client
Run Code Online (Sandbox Code Playgroud)

不仅仅是

import dask.dataframe as dd
        
df = dd.read_parquet(r'C:\Users\ggg\mis_1.parquet')
g=df.groupby('id')['id'].count().compute() 
Run Code Online (Sandbox Code Playgroud)

对我来说,没有客户它效果更好

python dask

1
推荐指数
1
解决办法
672
查看次数

Dask - 如何在多个数据帧上调用“.compute()”

我有两个在计算中相互依赖的数据帧,我想通过一次compute()调用获得两个数据帧的结果。代码可以总结如下:

import dask
import dask.dataframe
import dask.distributed
import pandas as pd

df = dask.dataframe.from_pandas(
    pd.DataFrame({
        "group": ['a', 'b', 'a', 'b', 'a', 'b', 'b'],
        "var_1": [0, 1, 2, 1, 2, 1, 0],
        "var_2": [1, 1, 2, 1, 2, 1, 0]}), npartitions=2)

with dask.distributed.Client() as client:
    for i in range(10):
        df_agg = foo(df)
        df = bar(df, df_agg)

print(df.compute())
print(df_agg.compute()) # -> I would like to have only one .compute() call and get the results of both dataframes (df and df_agg) …
Run Code Online (Sandbox Code Playgroud)

python dask dask-distributed

1
推荐指数
1
解决办法
34
查看次数

在 dask 中计算()做什么?

我是 dask 的新手,不明白计算()方法在 dask 中究竟做了什么?它是一种在它调用的地方打印对象的方法吗?我已经阅读了其网站上的文档,但不确定我是否理解“具体价值”和“懒惰”这两个术语。

您可以通过调用 .compute() 方法或 dask.compute(...) 函数将任何 dask 集合转换为具体值。此函数将阻塞直到计算完成,直接从惰性 dask 集合到本地内存中的具体值。

我的意思是说“它是一种在它调用的地方打印对象的方法吗?” 就是说,当我创建一个 dask 对象并在 spyder 控制台中调用它时,它会导致dask.array<arange, shape=(11,), dtype=int32, chunksize=(5,)>并且当我在其上调用 compute() 方法时,它会打印该对象。达斯

x 对象已通过以下代码创建:

x = da.arange(11, chunks=5)
Run Code Online (Sandbox Code Playgroud)

python dask

0
推荐指数
1
解决办法
2732
查看次数

使用dask计算移动平均线

我正在尝试计算一个非常大的数据集的移动平均值。行数约为30M。使用pandas来说明如下

df = pd.DataFrame({'cust_id':['a', 'a', 'a', 'b', 'b'], 'sales': [100, 200, 300, 400, 500]})
df['mov_avg'] = df.groupby("cust_id")["sales"].apply(lambda x: x.ewm(alpha=0.5, adjust=False).mean())
Run Code Online (Sandbox Code Playgroud)

这里我使用 pandas 来计算移动平均值。使用上面的方法,在 30M 数据集上计算大约需要 20 分钟。有没有办法在这里利用 DASK?

python pandas dask

0
推荐指数
1
解决办法
1479
查看次数

如何增加 GKE 中 DASK 的调度程序内存

我在 GCP 上部署了一个 kubernetes 集群,结合了 prefect 和 dask。这些作业在正常情况下运行良好,但无法扩展到 2 倍的数据。到目前为止,我已将范围缩小到调度程序因内存使用率过高而关闭。 Dask 调度程序内存 一旦内存使用量达到 2GB,作业就会失败并出现“未检测到心跳”错误。

有一个单独的构建 python 文件可用,我们可以在其中设置工作内存和 cpu。有一个 dask-gateway 软件包,我们可以在其中获取网关选项并设置工作内存。

options.worker_memory = 32
options.worker_cores = 10
cluster = gateway.new_cluster(options)
cluster.adapt(minimum=4, maximum=20)
Run Code Online (Sandbox Code Playgroud)

我无法弄清楚在哪里以及如何增加 dask-scheduler 的内存分配。

Specs:
Cluster Version: 1.19.14-gke.1900
Machine type - n1-highmem-64
Autoscaling set to 6 - 1000 nodes per zone
all nodes are allocated 63.77 CPU and 423.26 GB
Run Code Online (Sandbox Code Playgroud)

python google-kubernetes-engine dask mlops prefect

0
推荐指数
1
解决办法
364
查看次数