如何在单个线程中运行完整的Dask.distributed集群?我想用它来进行调试或分析.
注意:这是一个经常被问到的问题.我在这里将问题和答案添加到Stack Overflow中,以便将来重用.
MRo*_*lin 15
如果您可以使用单机调度程序的API(只是计算),那么您可以使用单线程调度程序
x.compute(scheduler='single-threaded')
Run Code Online (Sandbox Code Playgroud)
如果要在单个计算机上运行dask.distributed集群,则可以不带参数启动客户端
from dask.distributed import Client
client = Client() # Starts local cluster
x.compute()
Run Code Online (Sandbox Code Playgroud)
这使用许多线程但在一台机器上运行
或者,如果您想在单个进程中运行所有内容,则可以使用该processes=False关键字
from dask.distributed import Client
client = Client(processes=False) # Starts local cluster
x.compute()
Run Code Online (Sandbox Code Playgroud)
所有的通信和控制都发生在一个线程中,尽管计算发生在一个单独的线程池中.
要在一个线程中运行控制,通信和计算,您需要创建一个Tornado concurrent.futures Executor.请注意,此Tornado API可能不公开.
from dask.distributed import Scheduler, Worker, Client
from tornado.concurrent import DummyExecutor
from tornado.ioloop import IOLoop
import threading
loop = IOLoop()
e = DummyExecutor()
s = Scheduler(loop=loop)
s.start()
w = Worker(s.address, loop=loop, executor=e)
loop.add_callback(w._start)
async def f():
async with Client(s.address, start=False) as c:
future = c.submit(threading.get_ident)
result = await future
return result
>>> threading.get_ident() == loop.run_sync(f)
True
Run Code Online (Sandbox Code Playgroud)