使用 dask 分布式我尝试提交位于另一个名为 worker.py 的文件中的函数。在工人中,我有以下错误:
没有名为“worker”的模块
但是我无法弄清楚我在这里做错了什么......
这是我的代码示例:
import worker
def run(self):
dask_queue = queue.Queue()
remote_queue = self.executor.scatter(dask_queue)
map_queue = self.executor.map(worker.run, remote_queue)
result = self.executor.gather(map_queue)
# Load data into the queue
for option in self.input.get_next_option():
remote_queue.put([self.server, self.arg, option])
Run Code Online (Sandbox Code Playgroud)
这是在工作端获得的完整回溯:
分布式核心 - 信息 - 无法反序列化 b'\x80\x04\x95\x19\x00\x00\x00\x00\x00\x00\x00\x8c\x06worker\x94\x8c\nrun\x94\x93\x94。 ' 回溯(最近一次调用):文件“/usr/local/lib/python3.5/dist-packages/distributed/core.py”,第 74 行,加载返回 pickle.loads(x) ImportError: No module named '工人'distributed.worker - 警告 - 无法反序列化任务回溯(最近一次调用):文件“/usr/local/lib/python3.5/dist-packages/distributed/worker.py”,第496行,在compute_one任务中) 文件“/usr/local/lib/python3.5/dist-packages/distributed/worker.py”,第 284 行,反序列化函数 = 加载(函数)文件“/usr/local/lib/python3.5/dist -packages/distributed/core.py”,第 74 行,在负载中返回 pickle.loads(x) 导入错误:
我也面临类似的问题。创建 dask 图时使用了 python 模块中的函数。但是,工作进程找不到python模块。
工作控制台中出现以下错误。这里,tasks.py 包含 dask graph 中使用的工作函数。
[ worker 10.0.2.4 ] : ModuleNotFoundError: No module named 'tasks'
[ worker 10.0.2.4 ] : distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x95\x14\x00\x00\x00\x00\x00\x00\x00\x8c\x05tasks\x94\x8c\x06ogs_mk\x94\x93\x94.'
Run Code Online (Sandbox Code Playgroud)
当使用Client.upload_file (如下所示)将本地 python 模块发送给工作人员时,问题得到了解决。
client.upload_file('tasks.py') ## Send local package to workers
results = client.get(dsk, 'root') ## get the results
Run Code Online (Sandbox Code Playgroud)
有两种情况可能会出现此问题:未找到调用 dask-distributed 函数的主代码中的导入,或者未找到 dask-distributed 函数内的导入。无论哪种方式,解决方案都是更新 sys.path 以指向这些模块所在的位置。
就我而言,我更新了两者。
例如,假设在您的主脚本中您有模块 xxx,在您想要分发的 dask 函数中您有模块 yyy。可能应该是这样的:
from dask.distributed import Client
import sys
def update_syspath():
sys.path.insert(0, 'module_xxx_location')
# you have to update sys.path first before import the xxx module
import xxx
def dask_function():
sys.path.insert(0, 'module_yyy_location')
import yyy
client.submit(dask_function, params)
Run Code Online (Sandbox Code Playgroud)
编辑:请参阅 MRocklin 评论以获得更清洁的解决方案
实际上,如果要在 dask Worker 中执行的代码位于外部模块中,则必须从 Dask Worker 路径知道它(它不是从客户端序列化到 Worker)。
更改我的 PYTHONPATH 以确保工作人员知道该模块解决了问题。dask issues 中发布了类似的问题:
https://github.com/dask/distributed/issues/344