在分布式 dask 中提交工作函数,无需等待函数结束

ps0*_*604 3 python apscheduler dask dask-distributed

我有这个使用该apscheduler库提交进程的 python 代码,它工作正常:

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
array = [ 1, 3, 5, 7]

for elem in array:
    scheduler.add_job(function_to_submit, kwargs={ 'elem': elem })

scheduler.start()


def function_to_submit(elem):
     print(str(elem))
Run Code Online (Sandbox Code Playgroud)

请注意,进程是并行提交的,并且代码不会等待进程结束。

我需要的是将此代码迁移到dask distributed使用工作人员。我遇到的问题是,如果我使用dask提交方法,代码将等待所有函数结束,我需要代码继续。如何实现这一目标?

   client = Client('127.0.0.1:8786')
   future1 = client.submit(function_to_submit, 1)
   future3 = client.submit(function_to_submit, 3)
   L = [future1, future3]
   client.gather(L)  # <-- this waits until all the futures end
Run Code Online (Sandbox Code Playgroud)

Mic*_*ado 6

Dask 分布式有一种fire_and_forget方法可以替代,例如client.compute,或者dask.distributed.wait如果您希望调度程序保留任务,即使 future 超出了提交任务的 python 进程的范围。

from dask.distributed import Client, fire_and_forget
client = Client('127.0.0.1:8786')
fire_and_forget(client.submit(function_to_submit, 1))
fire_and_forget(client.submit(function_to_submit, 3))

# your script can now end and the scheduler will
# continue executing these until they end or the
# scheduler is terminated
Run Code Online (Sandbox Code Playgroud)

有关示例和其他使用模式,请参阅Futures 上的 dask.distributed 文档。