asyncio:是否有可能取消Executor运行的未来?

Bre*_*ire 13 python event-loop executor python-asyncio

我想在Executor中使用asyncio调用loop.run_in_executor启动阻塞函数,然后稍后取消它,但这对我来说似乎不起作用.

这是代码:

import asyncio
import time

from concurrent.futures import ThreadPoolExecutor


def blocking_func(seconds_to_block):
    for i in range(seconds_to_block):
        print('blocking {}/{}'.format(i, seconds_to_block))
        time.sleep(1)

    print('done blocking {}'.format(seconds_to_block))


@asyncio.coroutine
def non_blocking_func(seconds):
    for i in range(seconds):
        print('yielding {}/{}'.format(i, seconds))
        yield from asyncio.sleep(1)

    print('done non blocking {}'.format(seconds))


@asyncio.coroutine
def main():
    non_blocking_futures = [non_blocking_func(x) for x in range(1, 4)]
    blocking_future = loop.run_in_executor(None, blocking_func, 5)
    print('wait a few seconds!')
    yield from asyncio.sleep(1.5)

    blocking_future.cancel()
    yield from asyncio.wait(non_blocking_futures)



loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
loop.set_default_executor(executor)
asyncio.async(main())
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)

我希望上面的代码只允许阻塞函数输出:

blocking 0/5
blocking 1/5
Run Code Online (Sandbox Code Playgroud)

然后查看非阻塞函数的输出.但是,即使在我取消之后,封锁的未来仍在继续.

可能吗?还有其他方法吗?

谢谢

编辑:关于使用asyncio运行阻塞和非阻塞代码的更多讨论:如何使用asyncio 连接阻塞和非阻塞代码

dan*_*ano 17

在这种情况下,Future一旦它实际开始运行就无法取消它,因为你依赖于它的行为concurrent.futures.Future,并且它的文档陈述如下:

cancel()

尝试取消通话.如果当前正在执行调用并且无法取消,则该方法将返回False,否则将取消调用并返回该方法True.

因此,取消成功的唯一时间是任务仍在待处理Executor.现在,你实际上正在使用一个asyncio.Future包裹着的concurrent.futures.Future,实际上,如果你在调用之后尝试它,asyncio.Future返回的loop.run_in_executor()将会引发一个,即使底层任务实际上已经在运行.但是,它实际上不会取消内部任务的执行.CancellationErroryield fromcancel()Executor

如果您需要实际取消任务,则需要使用更传统的方法来中断线程中运行的任务.您如何做的具体细节是用例依赖.对于您在示例中提供的用例,您可以使用threading.Event:

def blocking_func(seconds_to_block, event):
    for i in range(seconds_to_block):
        if event.is_set():
            return
        print('blocking {}/{}'.format(i, seconds_to_block))
        time.sleep(1)

    print('done blocking {}'.format(seconds_to_block))


...
event = threading.Event()
blocking_future = loop.run_in_executor(None, blocking_func, 5, event)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)

blocking_future.cancel()  # Mark Future as cancelled
event.set() # Actually interrupt blocking_func
Run Code Online (Sandbox Code Playgroud)