如何在 aiohttp 框架中使用 aiopg 正确设置后台 postgres 通知侦听器?

Iva*_*kin 6 python postgresql python-asyncio aiohttp

我使用 aiohttp 处理 websockets 和 postgresql 监听/通知,使用 aiopg 驱动程序作为服务器之间的传输队列。
我的服务器设置的一部分:

def init(loop):
    ...
    app = Application(loop=loop)
    app.on_startup.append(connect_db)
    app.on_startup.append(start_background_tasks)
    app.on_cleanup.append(cleanup_background_tasks)
    return app


def create_app(loop):
    loop = asyncio.get_event_loop()
    app = init(loop)
    return app
Run Code Online (Sandbox Code Playgroud)

这是我的信号:

dsn = 'dbname=test user=root password=test host=127.0.0.1'

async def connect_db(app):
    app['db'] = await aiopg.connect(dsn)
    return app

async def listen_events_from_db(app):
    cursos = await app['db'].cursor()
    await cursos.execute("LISTEN test")
    try:
        while True:
            msg = await app['db'].notifies.get()
            # Here will be a coroutine which will send msg to connected websockets
            print('msg: ', msg.payload)
    except asyncio.CancelledError:
        pass
    finally:
        app['db'].close()

async def start_background_tasks(app):
    app['psql_listener'] = app.loop.create_task(listen_events_from_db(app))


async def cleanup_background_tasks(app):
    app['psql_listener'].cancel()
    await app['psql_listener']
Run Code Online (Sandbox Code Playgroud)

根据文档,我将后台任务包装为 try/except 并在on_cleanup信号中关闭任务。但在这种情况下,服务器因以下异常而失败:

...
raise CancelledError
concurrent.futures._base.CancelledError  
Run Code Online (Sandbox Code Playgroud)

如果我删除on_shutdown信号并捕获一个RuntimeError然后一切正常,除了关闭后关于销毁的警告listen_events_from_db()仍然未决。我想这是因为while循环。
这里是固定任务:

async def listen_events_from_db(app):
    cursos = await app['db'].cursor()
    await cursos.execute("LISTEN test")
    try:
        while True:
            msg = await app['db'].notifies.get()
            print('msg: ', msg.payload)
    except RuntimeError:
        pass
    finally:
        app['db'].close()
Run Code Online (Sandbox Code Playgroud)

我是 asyncio 和 aiohttp 框架的新手,感觉我做错了。请指出我正确的方向。