Iva*_*kin 6 python postgresql python-asyncio aiohttp
我使用 aiohttp 处理 websockets 和 postgresql 监听/通知,使用 aiopg 驱动程序作为服务器之间的传输队列。
我的服务器设置的一部分:
def init(loop):
...
app = Application(loop=loop)
app.on_startup.append(connect_db)
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
return app
def create_app(loop):
loop = asyncio.get_event_loop()
app = init(loop)
return app
Run Code Online (Sandbox Code Playgroud)
这是我的信号:
dsn = 'dbname=test user=root password=test host=127.0.0.1'
async def connect_db(app):
app['db'] = await aiopg.connect(dsn)
return app
async def listen_events_from_db(app):
cursos = await app['db'].cursor()
await cursos.execute("LISTEN test")
try:
while True:
msg = await app['db'].notifies.get()
# Here will be a coroutine which will send msg to connected websockets
print('msg: ', msg.payload)
except asyncio.CancelledError:
pass
finally:
app['db'].close()
async def start_background_tasks(app):
app['psql_listener'] = app.loop.create_task(listen_events_from_db(app))
async def cleanup_background_tasks(app):
app['psql_listener'].cancel()
await app['psql_listener']
Run Code Online (Sandbox Code Playgroud)
根据文档,我将后台任务包装为 try/except 并在on_cleanup信号中关闭任务。但在这种情况下,服务器因以下异常而失败:
Run Code Online (Sandbox Code Playgroud)... raise CancelledError concurrent.futures._base.CancelledError
如果我删除on_shutdown信号并捕获一个RuntimeError然后一切正常,除了关闭后关于销毁的警告listen_events_from_db()仍然未决。我想这是因为while循环。
这里是固定任务:
async def listen_events_from_db(app):
cursos = await app['db'].cursor()
await cursos.execute("LISTEN test")
try:
while True:
msg = await app['db'].notifies.get()
print('msg: ', msg.payload)
except RuntimeError:
pass
finally:
app['db'].close()
Run Code Online (Sandbox Code Playgroud)
我是 asyncio 和 aiohttp 框架的新手,感觉我做错了。请指出我正确的方向。
| 归档时间: |
|
| 查看次数: |
622 次 |
| 最近记录: |