我使用了.map_partitions延迟函数,得到的结果是一个数据帧,每行都有延迟结果。
有什么办法可以解压这些延迟的对象吗?难道我做错了什么?
谢谢。
我正在尝试并行化以下代码,它为每一行创建一个成对的结果。如下所示。
def get_custom_value(i, j):
first = df[df['id'] == i]
second = df[df['id'] == j]
return int(first['val_1']) * int(second['val_1']) +\
int(first['val_2']) * int(second['val_2'])
df = pd.DataFrame(
{
'id' : range(4),
'val_1' : [3, 4, 5, 1],
'val_2' : [2, 3, 1, 1]
}
)
n = df.shape[0]
result = []
for i in range(n):
for j in range(i+1, n):
temp_value = get_custom_value(i, j)
result.append([i, j, temp_value])
if len(result) > 1e5:
# store it in a local file and reset the result …Run Code Online (Sandbox Code Playgroud) 这是我第一次创建处理包含大量数据的文件的代码,所以我有点卡在这里。
\n我想要做的是读取路径列表,列出所有需要读取的 csv 文件,从每个文件中检索 HEAD 和 TAIL 并将其放入列表中。
\n我总共有 621 个 csv 文件,每个文件由 5800 行和 251 列组成\n
\n这是数据样本
[LOGGING],RD81DL96_1,3,4,5,2,,,,\nLOG01,,,,,,,,,\nDATETIME,INDEX,SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0],SHORT[DEC.0]\nTIME,INDEX,FF-1(\xef\xbc\x91A) ,FF-1(\xef\xbc\x91B) ,FF-1(\xef\xbc\x91C) ,FF-1(\xef\xbc\x92A),FF-2(\xef\xbc\x91A) ,FF-2(\xef\xbc\x91B) ,FF-2(\xef\xbc\x91C),FF-2(\xef\xbc\x92A)\n47:29.6,1,172,0,139,1258,0,0,400,0\n47:34.6,2,172,0,139,1258,0,0,400,0\n47:39.6,3,172,0,139,1258,0,0,400,0\n47:44.6,4,172,0,139,1263,0,0,400,0\n47:49.6,5,172,0,139,1263,0,0,450,0\n47:54.6,6,172,0,139,1263,0,0,450,0\nRun Code Online (Sandbox Code Playgroud)\n问题是,虽然读取所有文件花了大约 13 秒(老实说还是有点慢)
\n但是当我添加一行追加代码时,这个过程花了很多时间才能完成,大约需要 4 分钟。
\n以下是代码片段:
\n# CsvList: [File Path, Change Date, File size, File Name]\nfor x, file in enumerate(CsvList):\n timeColumn = [\'TIME\']\n df = dd.read_csv(file[0], sep =\',\', skiprows = 3, encoding= \'CP932\', engine=\'python\', usecols=timeColumn)\n\n # The process became long when this code is added\n startEndList.append(list(df.head(1)) + list(df.tail(1))) …Run Code Online (Sandbox Code Playgroud) 关于黄昏。
我想将 parquet 文件读取为 df 并运行 groupby 函数
我的问题是为什么我应该先运行这段代码?
from dask.distributed import Client, progress
client = Client()
client
Run Code Online (Sandbox Code Playgroud)
不仅仅是
import dask.dataframe as dd
df = dd.read_parquet(r'C:\Users\ggg\mis_1.parquet')
g=df.groupby('id')['id'].count().compute()
Run Code Online (Sandbox Code Playgroud)
对我来说,没有客户它效果更好
我有两个在计算中相互依赖的数据帧,我想通过一次compute()调用获得两个数据帧的结果。代码可以总结如下:
import dask
import dask.dataframe
import dask.distributed
import pandas as pd
df = dask.dataframe.from_pandas(
pd.DataFrame({
"group": ['a', 'b', 'a', 'b', 'a', 'b', 'b'],
"var_1": [0, 1, 2, 1, 2, 1, 0],
"var_2": [1, 1, 2, 1, 2, 1, 0]}), npartitions=2)
with dask.distributed.Client() as client:
for i in range(10):
df_agg = foo(df)
df = bar(df, df_agg)
print(df.compute())
print(df_agg.compute()) # -> I would like to have only one .compute() call and get the results of both dataframes (df and df_agg) …Run Code Online (Sandbox Code Playgroud) 我是 dask 的新手,不明白计算()方法在 dask 中究竟做了什么?它是一种在它调用的地方打印对象的方法吗?我已经阅读了其网站上的文档,但不确定我是否理解“具体价值”和“懒惰”这两个术语。
您可以通过调用 .compute() 方法或 dask.compute(...) 函数将任何 dask 集合转换为具体值。此函数将阻塞直到计算完成,直接从惰性 dask 集合到本地内存中的具体值。
我的意思是说“它是一种在它调用的地方打印对象的方法吗?” 就是说,当我创建一个 dask 对象并在 spyder 控制台中调用它时,它会导致dask.array<arange, shape=(11,), dtype=int32, chunksize=(5,)>并且当我在其上调用 compute() 方法时,它会打印该对象。
x 对象已通过以下代码创建:
x = da.arange(11, chunks=5)
Run Code Online (Sandbox Code Playgroud) 我正在尝试计算一个非常大的数据集的移动平均值。行数约为30M。使用pandas来说明如下
df = pd.DataFrame({'cust_id':['a', 'a', 'a', 'b', 'b'], 'sales': [100, 200, 300, 400, 500]})
df['mov_avg'] = df.groupby("cust_id")["sales"].apply(lambda x: x.ewm(alpha=0.5, adjust=False).mean())
Run Code Online (Sandbox Code Playgroud)
这里我使用 pandas 来计算移动平均值。使用上面的方法,在 30M 数据集上计算大约需要 20 分钟。有没有办法在这里利用 DASK?
我在 GCP 上部署了一个 kubernetes 集群,结合了 prefect 和 dask。这些作业在正常情况下运行良好,但无法扩展到 2 倍的数据。到目前为止,我已将范围缩小到调度程序因内存使用率过高而关闭。 Dask 调度程序内存 一旦内存使用量达到 2GB,作业就会失败并出现“未检测到心跳”错误。
有一个单独的构建 python 文件可用,我们可以在其中设置工作内存和 cpu。有一个 dask-gateway 软件包,我们可以在其中获取网关选项并设置工作内存。
options.worker_memory = 32
options.worker_cores = 10
cluster = gateway.new_cluster(options)
cluster.adapt(minimum=4, maximum=20)
Run Code Online (Sandbox Code Playgroud)
我无法弄清楚在哪里以及如何增加 dask-scheduler 的内存分配。
Specs:
Cluster Version: 1.19.14-gke.1900
Machine type - n1-highmem-64
Autoscaling set to 6 - 1000 nodes per zone
all nodes are allocated 63.77 CPU and 423.26 GB
Run Code Online (Sandbox Code Playgroud)