Bre*_*ire 13 python event-loop executor python-asyncio
我想在Executor中使用asyncio调用loop.run_in_executor启动阻塞函数,然后稍后取消它,但这对我来说似乎不起作用.
这是代码:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_func(seconds_to_block):
for i in range(seconds_to_block):
print('blocking {}/{}'.format(i, seconds_to_block))
time.sleep(1)
print('done blocking {}'.format(seconds_to_block))
@asyncio.coroutine
def non_blocking_func(seconds):
for i in range(seconds):
print('yielding {}/{}'.format(i, seconds))
yield from asyncio.sleep(1)
print('done non blocking {}'.format(seconds))
@asyncio.coroutine
def main():
non_blocking_futures = [non_blocking_func(x) for x in range(1, 4)]
blocking_future = loop.run_in_executor(None, blocking_func, 5)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)
blocking_future.cancel()
yield from asyncio.wait(non_blocking_futures)
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
loop.set_default_executor(executor)
asyncio.async(main())
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)
我希望上面的代码只允许阻塞函数输出:
blocking 0/5
blocking 1/5
Run Code Online (Sandbox Code Playgroud)
然后查看非阻塞函数的输出.但是,即使在我取消之后,封锁的未来仍在继续.
可能吗?还有其他方法吗?
谢谢
编辑:关于使用asyncio运行阻塞和非阻塞代码的更多讨论:如何使用asyncio 连接阻塞和非阻塞代码
dan*_*ano 17
在这种情况下,Future一旦它实际开始运行就无法取消它,因为你依赖于它的行为concurrent.futures.Future,并且它的文档陈述如下:
cancel()尝试取消通话.如果当前正在执行调用并且无法取消,则该方法将返回
False,否则将取消调用并返回该方法True.
因此,取消成功的唯一时间是任务仍在待处理Executor.现在,你实际上正在使用一个asyncio.Future包裹着的concurrent.futures.Future,实际上,如果你在调用之后尝试它,asyncio.Future返回的loop.run_in_executor()将会引发一个,即使底层任务实际上已经在运行.但是,它实际上不会取消内部任务的执行.CancellationErroryield fromcancel()Executor
如果您需要实际取消任务,则需要使用更传统的方法来中断线程中运行的任务.您如何做的具体细节是用例依赖.对于您在示例中提供的用例,您可以使用threading.Event:
def blocking_func(seconds_to_block, event):
for i in range(seconds_to_block):
if event.is_set():
return
print('blocking {}/{}'.format(i, seconds_to_block))
time.sleep(1)
print('done blocking {}'.format(seconds_to_block))
...
event = threading.Event()
blocking_future = loop.run_in_executor(None, blocking_func, 5, event)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)
blocking_future.cancel() # Mark Future as cancelled
event.set() # Actually interrupt blocking_func
Run Code Online (Sandbox Code Playgroud)