在 dask 产生的进程中调用 dask

Sam*_*our 4 python multiprocessing dask

我们有一个包含众多任务的大型项目。我们使用 dask 图来安排每个任务。该图的一个小样本如下。请注意,dask 设置为多处理模式。

dask_graph:

  universe: !!python/tuple [gcsstrategies.svc.business_service.UniverseService.load_universe_object, CONTEXT]
  raw_market_data: !!python/tuple [gcsstrategies.svc.data_loading_service.RDWLoader.load_market_data, CONTEXT, universe]
  raw_fundamental_data: !!python/tuple [gcsstrategies.svc.data_loading_service.RDWLoader.load_fundamental_data, CONTEXT, universe]

dask_keys: [raw_fundamental_data]
Run Code Online (Sandbox Code Playgroud)

现在的任务之一,raw_fundamental_data懒洋洋地安排使用DASK任务,@delay并使用游dask.compute()。这种设计选择的原因是将raw_fundamental_data在运行时根据运行时参数动态选择由 dask inside 调度和延迟运行的任务列表。

我们看到的错误是:

守护进程不允许有子进程

我们理解这是因为生成的进程试图生成子进程。这个问题有什么解决办法吗?dask 是否有任何方法允许通过 daskgraph 安排的任务使用@delay或其他方法来安排和延迟运行自己的任务。

请注意,在我们的系统中,有许多任务将使用多处理运行自己的任务。所以顺序执行不是一种选择。

MRo*_*lin 5

多处理调度程序不能进行这种操作。但是,分布式调度程序是(您也可以轻松地在单台机器上使用分布式调度程序。

相关文档页面在这里:

这是一个小例子

In [1]: from dask.distributed import Client, local_client

In [2]: def f(n):
   ...:     with local_client() as lc:
   ...:         futures = [lc.submit(lambda x: x + 1, i) for i in range(n)]
   ...:         total = lc.submit(sum, futures)
   ...:         return total.result()
   ...:     

In [3]: c = Client()  # start processes on local machine

In [4]: future = c.submit(f, 10)

In [5]: future.result()
Out[5]: 55
Run Code Online (Sandbox Code Playgroud)

这使用 concurrent.futures 接口 dask 而不是dask.delayed,但您也可以使用 dask.delayed 。请参阅http://distributed.readthedocs.io/en/latest/manage-computation.html