我正在尝试使用 dask(async) 框架执行一个简单的任务(一个实例方法),但它因序列化错误而失败。
有人可以指出我正确的方向。
这是我正在运行的代码:
from dask.distributed import Client, as_completed
import time
class DaskConnect:
def __init__(self):
print("Initialized:",self.__class__.__name__)
self.scheduler_host="192.168.0.4"
self.scheduler_port="8786"
def connect(self):
self.client = Client(self.scheduler_host+":"+self.scheduler_port)
# self.client = Client()
return self.client
def disconnect(self):
self.client.close()
class TestDask:
def __init__(self):
print("Initialized:",self.__class__.__name__)
self.dask_client=DaskConnect().connect()
def do_task(self,msg):
time.sleep(30)
return msg
def run(self):
tasks=[1]
# tasks = [1, 2, 3, 4, 5]
futures=[]
for task in tasks:
print("Submitting:",task)
future = self.dask_client.submit(self.do_task, "Task:"+str(task))
futures.append(future)
for future in as_completed(futures):
result = future.result()
print("Result",result)
TestDask().run()
Run Code Online (Sandbox Code Playgroud)
错误:
分布式协议.pickle - 信息 - …
我对如何从 dask 中获得最佳效果感到困惑。
问题
我有一个包含多个时间序列的数据帧(每个都有自己的时间序列key),我需要my_fun在每个时间序列上运行一个函数。使用 Pandas 解决它的一种方法涉及
df = list(df.groupby("key"))然后应用my_fun
多处理。尽管 RAM 使用量很大,但性能在我的机器上非常好,而在谷歌云计算上却很糟糕。
在 Dask 我目前的工作流程是:
import dask.dataframe as dd
from dask.multiprocessing import get
Run Code Online (Sandbox Code Playgroud)
因为我没有设置索引df.known_divisions是False
问题:
df.npartitions作为倍数更好ncpu还是无关紧要?从这个似乎是更好地设置索引的关键。我的猜测是我可以做类似的事情
df["key2"] = df["key"] df = df.set_index("key2")
但是,同样,我不知道这是否是最好的方法。
在 OSX 10.12.6 上运行 Dask 0.16.0 时,我无法将本地连接dask-worker到本地dask-scheduler. 我只是想遵循官方Dask 教程。重现步骤:
第 1 步:运行dask-scheduler
第 2 步:运行dask-worker 10.160.39.103:8786
该问题似乎与 dask 调度程序有关,而不是与工作人员有关,因为我什至无法通过其他方式访问该端口(例如nc -zv 10.160.39.103 8786)。
然而,该进程显然仍在机器上运行:
我的 dask 工作人员需要运行init取决于集群中工作人员数量的代码。工作人员可以访问这样的集群元数据吗?
只有在将文件公开后将“anon”参数更改为 True 时,我才能加载数据。
df = dd.read_csv('s3://mybucket/some-big.csv', storage_options = {'anon':False})
Run Code Online (Sandbox Code Playgroud)
出于明显的原因,不建议这样做。如何安全地从 S3 加载数据?
我有这个使用该apscheduler库提交进程的 python 代码,它工作正常:
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
array = [ 1, 3, 5, 7]
for elem in array:
scheduler.add_job(function_to_submit, kwargs={ 'elem': elem })
scheduler.start()
def function_to_submit(elem):
print(str(elem))
Run Code Online (Sandbox Code Playgroud)
请注意,进程是并行提交的,并且代码不会等待进程结束。
我需要的是将此代码迁移到dask distributed使用工作人员。我遇到的问题是,如果我使用dask提交方法,代码将等待所有函数结束,我需要代码继续。如何实现这一目标?
client = Client('127.0.0.1:8786')
future1 = client.submit(function_to_submit, 1)
future3 = client.submit(function_to_submit, 3)
L = [future1, future3]
client.gather(L) # <-- this waits until all the futures end
Run Code Online (Sandbox Code Playgroud) 是否有一种更快的方法可以使用 Dask 仅检索大型已发布数组中的单个元素而不检索整个数组?
在下面的示例中,client.get_dataset('array1')[0] 与 client.get_dataset('array1') 花费的时间大致相同。
import distributed
client = distributed.Client()
data = [1]*10000000
payload = {'array1': data}
client.publish(**payload)
one_element = client.get_dataset('array1')[0]
Run Code Online (Sandbox Code Playgroud) 使用 Dask 分布式分散广播列表的适当方法是什么?
案例 1 - 包装列表:
[future_list] = client.scatter([my_list], broadcast=True)
Run Code Online (Sandbox Code Playgroud)
情况 2 - 不包装列表:
future_list = client.scatter(my_list, broadcast=True)
Run Code Online (Sandbox Code Playgroud)
在 Dask 文档中,我看到了两个示例:1. wrapping(见底部示例)和2. not wrapping。根据我的经验,案例 1 是最好的方法,在案例 2 中构建 Dask 图(在我的用例中很大)需要更长的时间。
什么可以解释图形构建时间的差异?这是预期的行为吗?
提前致谢。
托马斯
我想让所有工人做同样的任务,像这样:
from dask import distributed
from distributed import Client,LocalCluster
import dask
import socket
def writer(filename,data):
with open(filename,'w') as f:
f.writelines(data)
def get_ip(x):
return socket.gethostname()
#writer('/data/1.txt',a)
client = Client('192.168.123.1:8786')
A=client.submit(get_ip, 0,workers=['w1','w2'], pure=False)
print(client.ncores(),
client.scheduler_info()
# dask.config.get('distributed')
)
A.result()
Run Code Online (Sandbox Code Playgroud)
我有 2 个工人,但只打印 1 个工人的主机名
启动 dask 分布式本地集群时,您可以为 .dask 设置随机端口或地址dashboard_address。
如果你后来得到了这个scheduler对象。有没有办法提取仪表板的地址。
我有这个:
cluster = dask.distributed.LocalCluster(scheduler_port=0,
dashboard_address='localhost:0')
scheduler = dask.distributed.Client(cluster, set_as_default=False)
scheduler_info = scheduler.scheduler_info()
logger.info('Scheduler: %s', scheduler_info['address'])
logger.info('Status Port: %s', scheduler_info['services']['dashboard'])
Run Code Online (Sandbox Code Playgroud)
但这只能获取仪表板的端口,而不是仪表板的 IP。如果我将仪表板地址放在调度程序之外的单独 IP 上,似乎很难知道它绑定到哪个 IP。