Sam*_*our 4 python multiprocessing dask
我们有一个包含众多任务的大型项目。我们使用 dask 图来安排每个任务。该图的一个小样本如下。请注意,dask 设置为多处理模式。
dask_graph:
universe: !!python/tuple [gcsstrategies.svc.business_service.UniverseService.load_universe_object, CONTEXT]
raw_market_data: !!python/tuple [gcsstrategies.svc.data_loading_service.RDWLoader.load_market_data, CONTEXT, universe]
raw_fundamental_data: !!python/tuple [gcsstrategies.svc.data_loading_service.RDWLoader.load_fundamental_data, CONTEXT, universe]
dask_keys: [raw_fundamental_data]
Run Code Online (Sandbox Code Playgroud)
现在的任务之一,raw_fundamental_data懒洋洋地安排使用DASK任务,@delay并使用游dask.compute()。这种设计选择的原因是将raw_fundamental_data在运行时根据运行时参数动态选择由 dask inside 调度和延迟运行的任务列表。
我们看到的错误是:
守护进程不允许有子进程
我们理解这是因为生成的进程试图生成子进程。这个问题有什么解决办法吗?dask 是否有任何方法允许通过 daskgraph 安排的任务使用@delay或其他方法来安排和延迟运行自己的任务。
请注意,在我们的系统中,有许多任务将使用多处理运行自己的任务。所以顺序执行不是一种选择。
多处理调度程序不能进行这种操作。但是,分布式调度程序是(您也可以轻松地在单台机器上使用分布式调度程序。
相关文档页面在这里:
In [1]: from dask.distributed import Client, local_client
In [2]: def f(n):
...: with local_client() as lc:
...: futures = [lc.submit(lambda x: x + 1, i) for i in range(n)]
...: total = lc.submit(sum, futures)
...: return total.result()
...:
In [3]: c = Client() # start processes on local machine
In [4]: future = c.submit(f, 10)
In [5]: future.result()
Out[5]: 55
Run Code Online (Sandbox Code Playgroud)
这使用 concurrent.futures 接口 dask 而不是dask.delayed,但您也可以使用 dask.delayed 。请参阅http://distributed.readthedocs.io/en/latest/manage-computation.html
| 归档时间: |
|
| 查看次数: |
661 次 |
| 最近记录: |