请解释"任务已被破坏,但它正在等待!"

Dan*_*iel 21 python python-3.x python-3.4 python-asyncio

Python 3.4.2

我正在学习asyncio,我用它来不断听IPC总线,而gbulb听dbus.

一些旁注:

所以我创建了一个函数listen_to_ipc_channel_layer,它不断地监听IPC通道上的传入消息并将消息传递给message_handler.

我也在听SIGTERM和SIGINT.因此,当我向运行您在底部找到的代码的python进程发送SIGTERM时,脚本应该正常终止.

问题

......我有以下警告:

got signal 15: exit
Task was destroyed but it is pending!
task: <Task pending coro=<listen_to_ipc_channel_layer() running at /opt/mainloop-test.py:23> wait_for=<Future cancelled>>

Process finished with exit code 0
Run Code Online (Sandbox Code Playgroud)

...使用以下代码:

import asyncio
import gbulb
import signal
import asgi_ipc as asgi

def main():
    asyncio.async(listen_to_ipc_channel_layer())
    loop = asyncio.get_event_loop()

    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, ask_exit)

    # Start listening on the Linux IPC bus for incoming messages
    loop.run_forever()
    loop.close()

@asyncio.coroutine
def listen_to_ipc_channel_layer():
    """Listens to the Linux IPC bus for messages"""
    while True:
        message_handler(message=channel_layer.receive(["my_channel"]))
        try:
            yield from asyncio.sleep(0.1)
        except asyncio.CancelledError:
            break

def ask_exit():
    loop = asyncio.get_event_loop()
    for task in asyncio.Task.all_tasks():
        task.cancel()
    loop.stop()


if __name__ == "__main__":
    gbulb.install()
    # Connect to the IPC bus
    channel_layer = asgi.IPCChannelLayer(prefix="my_channel")
    main()
Run Code Online (Sandbox Code Playgroud)

我仍然只了解asyncio,但我想我知道发生了什么.在等待yield from asyncio.sleep(0.1)信号处理程序捕获SIGTERM并在该进程中调用它task.cancel().

问题抛出:如果不是这触发CancelledError了内while True:循环?(因为它不是,但这就是我的理解"调用cancel()会将一个CancelledError抛给包装的协同程序").

最终loop.stop()被调用,它停止循环而不等待yield from asyncio.sleep(0.1)返回结果甚至整个协程listen_to_ipc_channel_layer.

如果我错了,请纠正我.

我认为我唯一需要做的就是让我的程序等待yield from asyncio.sleep(0.1)返回结果和/或协同程序以打破while循环并完成.

我相信我混淆了很多东西.请帮我把这些东西直接搞定,这样我就可以弄清楚如何在没有警告的情况下优雅地关闭事件循环.

Yer*_*iaz 16

问题来自于取消任务后立即关闭循环.正如cancel()docs所述

"这会安排在事件循环的下一个循环中将CancelledError抛入包装的协程."

拿这段代码:

import asyncio
import signal


async def pending_doom():
    await asyncio.sleep(2)
    print(">> Cancelling tasks now")
    for task in asyncio.Task.all_tasks():
        task.cancel()

    print(">> Done cancelling tasks")
    asyncio.get_event_loop().stop()


def ask_exit():
    for task in asyncio.Task.all_tasks():
        task.cancel()


async def looping_coro():
    print("Executing coroutine")
    while True:
        try:
            await asyncio.sleep(0.25)
        except asyncio.CancelledError:
            print("Got CancelledError")
            break

        print("Done waiting")

    print("Done executing coroutine")
    asyncio.get_event_loop().stop()


def main():
    asyncio.async(pending_doom())
    asyncio.async(looping_coro())

    loop = asyncio.get_event_loop()
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, ask_exit)

    loop.run_forever()

    # I had to manually remove the handlers to
    # avoid an exception on BaseEventLoop.__del__
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.remove_signal_handler(sig)


if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

注意ask_exit取消任务但不stop循环,在下一个循环looping_coro()停止它.取消输出的输出是:

Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
^CGot CancelledError
Done executing coroutine
Run Code Online (Sandbox Code Playgroud)

注意如何在之后立即pending_doom取消和停止循环.如果你让它运行直到协同程序从睡眠中醒来,你可以看到你得到的相同警告:pending_doom

Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
>> Cancelling tasks now
>> Done cancelling tasks
Task was destroyed but it is pending!
task: <Task pending coro=<looping_coro() running at canceling_coroutines.py:24> wait_for=<Future cancelled>>
Run Code Online (Sandbox Code Playgroud)


I15*_*159 7

问题的意思是循环没有时间来完成所有任务。

这安排在事件循环的下一个周期将CancelledError引发到包装的协程中。

您的方法中没有机会进行循环的“下一个循环”。为了使其正确运行,您应该将停止操作移至单独的非循环协程,以使循环有机会完成。

第二件事是CancelledError提高。

与Future.cancel()不同,这不能保证任务将被取消:可能会捕获并执行异常,从而延迟了任务的取消或完全阻止了取消。该任务还可能返回值或引发其他异常。

调用此方法后,canceled()不会立即返回True(除非任务已被取消)。当包装的协程以CancelledError异常终止(即使未调用cancel())时,任务也会被标记为已取消。

因此,清理后您的协程必须升起CancelledError以标记为已取消。

使用额外的协程停止循环不是问题,因为它不是循环的,并且在执行后立即完成。

def main():                                              
    loop = asyncio.get_event_loop()                      
    asyncio.ensure_future(listen_to_ipc_channel_layer()) 

    for sig in (signal.SIGINT, signal.SIGTERM):          
        loop.add_signal_handler(sig, ask_exit)           
    loop.run_forever()                                   
    print("Close")                                       
    loop.close()                                         


@asyncio.coroutine                                       
def listen_to_ipc_channel_layer():                       
    while True:                                          
        try:                                             
            print("Running")                                 
            yield from asyncio.sleep(0.1)                
        except asyncio.CancelledError as e:              
            print("Break it out")                        
            raise e # Raise a proper error


# Stop the loop concurrently           
@asyncio.coroutine                                       
def exit():                                              
    loop = asyncio.get_event_loop()                      
    print("Stop")                                        
    loop.stop()                                          


def ask_exit():                          
    for task in asyncio.Task.all_tasks():
        task.cancel()                    
    asyncio.ensure_future(exit())        


if __name__ == "__main__":               
    main()                               
Run Code Online (Sandbox Code Playgroud)


小智 7

我收到此消息,我相信这是由待处理任务的垃圾收集引起的。Python 开发人员正在争论在 asyncio 中创建的任务是否应该创建强引用,并决定不应该(在研究这个问题 2 天后,我强烈不同意!...请参阅此处的讨论https://bugs.python.org/问题21163

我为自己创建了这个实用程序,以对任务进行强引用并自动清理它(仍然需要彻底测试它)...

import asyncio

#create a strong reference to tasks since asyncio doesn't do this for you
task_references = set()

def register_ensure_future(coro):
    task = asyncio.ensure_future(coro)
    task_references.add(task)

    # Setup cleanup of strong reference on task completion...
    def _on_completion(f):
        task_references.remove(f)
    task.add_done_callback(_on_completion)
    
    return task
Run Code Online (Sandbox Code Playgroud)

在我看来,只要任务处于活动状态,就应该具有很强的参考性!但是 asyncio 不会为你做这些,所以一旦发生 gc 和长时间的调试,你可能会遇到一些糟糕的意外。