标签: dask-distributed

使用 dask 提交任务时出现 Pickle 错误

我正在尝试使用 dask(async) 框架执行一个简单的任务(一个实例方法),但它因序列化错误而失败。

有人可以指出我正确的方向。

这是我正在运行的代码:

from dask.distributed import Client, as_completed
import time

class DaskConnect:

def __init__(self):
    print("Initialized:",self.__class__.__name__)
    self.scheduler_host="192.168.0.4"
    self.scheduler_port="8786"

def connect(self):
    self.client = Client(self.scheduler_host+":"+self.scheduler_port)
    # self.client = Client()
    return self.client

def disconnect(self):
    self.client.close()

class TestDask:
def __init__(self):
    print("Initialized:",self.__class__.__name__)
    self.dask_client=DaskConnect().connect()

def do_task(self,msg):
    time.sleep(30)
    return msg

def run(self):
    tasks=[1]
    # tasks = [1, 2, 3, 4, 5]
    futures=[]
    for task in tasks:
        print("Submitting:",task)
        future = self.dask_client.submit(self.do_task, "Task:"+str(task))
        futures.append(future)

    for future in as_completed(futures):
        result = future.result()
        print("Result",result)

TestDask().run()
Run Code Online (Sandbox Code Playgroud)

错误:

分布式协议.pickle - 信息 - …

python dask dask-distributed

4
推荐指数
1
解决办法
1972
查看次数

Dask 表演:工作流程疑虑

我对如何从 dask 中获得最佳效果感到困惑。

问题 我有一个包含多个时间序列的数据帧(每个都有自己的时间序列key),我需要my_fun在每个时间序列上运行一个函数。使用 Pandas 解决它的一种方法涉及 df = list(df.groupby("key"))然后应用my_fun 多处理。尽管 RAM 使用量很大,但性能在我的机器上非常好,而在谷歌云计算上却很糟糕。

在 Dask 我目前的工作流程是:

import dask.dataframe as dd
from dask.multiprocessing import get
Run Code Online (Sandbox Code Playgroud)
  1. 从 S3 读取数据。14 个文件 -> 14 个分区
  2. `df.groupby("key").apply(my_fun).to_frame.compute(get=get)

因为我没有设置索引df.known_divisionsFalse

结果图是 在此处输入图片说明 我不明白我所看到的是否是瓶颈。

问题:

  1. df.npartitions作为倍数更好ncpu还是无关紧要?
  2. 这个似乎是更好地设置索引的关键。我的猜测是我可以做类似的事情

    df["key2"] = df["key"] df = df.set_index("key2")

但是,同样,我不知道这是否是最好的方法。

dask dask-distributed

4
推荐指数
1
解决办法
289
查看次数

本地 Dask 工作线程无法连接到本地调度程序

在 OSX 10.12.6 上运行 Dask 0.16.0 时,我无法将本地连接dask-worker到本地dask-scheduler. 我只是想遵循官方Dask 教程。重现步骤:

第 1 步:运行dask-scheduler

在此输入图像描述

第 2 步:运行dask-worker 10.160.39.103:8786

在此输入图像描述

该问题似乎与 dask 调度程序有关,而不是与工作人员有关,因为我什至无法通过其他方式访问该端口(例如nc -zv 10.160.39.103 8786)。

在此输入图像描述

然而,该进程显然仍在机器上运行:

在此输入图像描述

dask dask-distributed

4
推荐指数
1
解决办法
2509
查看次数

dask worker 如何访问集群中当前的 worker 总数?

我的 dask 工作人员需要运行init取决于集群中工作人员数量的代码。工作人员可以访问这样的集群元数据吗?

python dask dask-distributed

3
推荐指数
1
解决办法
1483
查看次数

将数据从 S3 加载到 dask 数据帧

只有在将文件公开后将“anon”参数更改为 True 时,我才能加载数据。

df = dd.read_csv('s3://mybucket/some-big.csv',  storage_options = {'anon':False})
Run Code Online (Sandbox Code Playgroud)

出于明显的原因,不建议这样做。如何安全地从 S3 加载数据?

python dask dask-distributed

3
推荐指数
1
解决办法
4226
查看次数

在分布式 dask 中提交工作函数,无需等待函数结束

我有这个使用该apscheduler库提交进程的 python 代码,它工作正常:

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
array = [ 1, 3, 5, 7]

for elem in array:
    scheduler.add_job(function_to_submit, kwargs={ 'elem': elem })

scheduler.start()


def function_to_submit(elem):
     print(str(elem))
Run Code Online (Sandbox Code Playgroud)

请注意,进程是并行提交的,并且代码不会等待进程结束。

我需要的是将此代码迁移到dask distributed使用工作人员。我遇到的问题是,如果我使用dask提交方法,代码将等待所有函数结束,我需要代码继续。如何实现这一目标?

   client = Client('127.0.0.1:8786')
   future1 = client.submit(function_to_submit, 1)
   future3 = client.submit(function_to_submit, 3)
   L = [future1, future3]
   client.gather(L)  # <-- this waits until all the futures end
Run Code Online (Sandbox Code Playgroud)

python apscheduler dask dask-distributed

3
推荐指数
1
解决办法
1149
查看次数

使用 Dask 访问大型已发布数组中的单个元素

是否有一种更快的方法可以使用 Dask 仅检索大型已发布数组中的单个元素而不检索整个数组?

在下面的示例中,client.get_dataset('array1')[0] 与 client.get_dataset('array1') 花费的时间大致相同。

import distributed
client = distributed.Client()
data = [1]*10000000
payload = {'array1': data}
client.publish(**payload)

one_element = client.get_dataset('array1')[0]
Run Code Online (Sandbox Code Playgroud)

dask dask-delayed dask-distributed

2
推荐指数
1
解决办法
683
查看次数

Dask scatter 广播列表

使用 Dask 分布式分散广播列表的适当方法是什么?

案例 1 - 包装列表:

[future_list] = client.scatter([my_list], broadcast=True)
Run Code Online (Sandbox Code Playgroud)

情况 2 - 不包装列表:

future_list = client.scatter(my_list, broadcast=True)
Run Code Online (Sandbox Code Playgroud)

在 Dask 文档中,我看到了两个示例:1. wrapping(见底部示例)2. not wrapping。根据我的经验,案例 1 是最好的方法,在案例 2 中构建 Dask 图(在我的用例中很大)需要更长的时间。

什么可以解释图形构建时间的差异?这是预期的行为吗?

提前致谢。

托马斯

broadcast dask dask-distributed

2
推荐指数
1
解决办法
344
查看次数

如何让所有工人在dask中执行相同的任务?

我想让所有工人做同样的任务,像这样:

from dask import distributed
from distributed import Client,LocalCluster
import dask
import socket


def writer(filename,data):
    with open(filename,'w') as f:
        f.writelines(data)

def get_ip(x):
    return socket.gethostname()
    #writer('/data/1.txt',a)
client = Client('192.168.123.1:8786')

A=client.submit(get_ip, 0,workers=['w1','w2'], pure=False)
print(client.ncores(),
        client.scheduler_info()
#       dask.config.get('distributed')
     )
A.result()  
Run Code Online (Sandbox Code Playgroud)

我有 2 个工人,但只打印 1 个工人的主机名

dask dask-distributed

2
推荐指数
1
解决办法
952
查看次数

如何从 Dask 调度程序获取仪表板地址

启动 dask 分布式本地集群时,您可以为 .dask 设置随机端口或地址dashboard_address

如果你后来得到了这个scheduler对象。有没有办法提取仪表板的地址。

我有这个:

cluster = dask.distributed.LocalCluster(scheduler_port=0,
                                        dashboard_address='localhost:0')
scheduler = dask.distributed.Client(cluster, set_as_default=False)
scheduler_info = scheduler.scheduler_info()
logger.info('Scheduler: %s', scheduler_info['address'])
logger.info('Status Port: %s', scheduler_info['services']['dashboard'])
Run Code Online (Sandbox Code Playgroud)

但这只能获取仪表板的端口,而不是仪表板的 IP。如果我将仪表板地址放在调度程序之外的单独 IP 上,似乎很难知道它绑定到哪个 IP。

dask dask-distributed

2
推荐指数
1
解决办法
2526
查看次数