Rub*_*ben 2 python asynchronous python-asyncio asyncpg
我有一个服务网络套接字的类,并听 PostgreSQL。使用 asyncpg,当我尝试使用 add_listener 时,出现错误:RuntimeWarning: coroutine was never awaited。如何异步/等待回调。我尝试添加“等待 self.listener”,但它不起作用。
有没有办法以另一种方式处理这个问题?
import asyncio
import http
import websockets
import asyncpg
class App(object):
def __init__(self, loop):
self.loop = loop
self.ws_list = []
self.conn = None
async def ws_handler(self, ws, path):
if self.conn is None:
self.conn = await asyncpg.connect(user='xxx', password='xxx', database='pgws', host='127.0.0.1')
await self.conn.add_listener('todo_updates', self.listener)
print('new socket!!!')
self.ws_list.append(ws)
while True:
await asyncio.sleep(1)
async def listener(self, conn, pid, channel, payload):
print(payload)
for ws in self.ws_list:
task = asyncio.create_task()
await ws.send(payload)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
app = App(loop)
start_server = websockets.serve(app.ws_handler, 'localhost', 8766)
app.loop.run_until_complete(start_server)
app.loop.run_forever()
Run Code Online (Sandbox Code Playgroud)
小智 6
问题是您传递给的回调asyncpg.Connection.add_listener()是一个协程函数,但它应该是一个简单的同步函数。asyncpg 不会引发错误,因为从技术上讲,它仍然是一个需要连接、pid、通道和有效负载的可调用对象,但它在调用时的行为并不像您预期的那样。
要从同步回调中调用异步函数(同时事件循环已经在运行),您需要使用类似asyncio.create_task()(在 Python >=3.7 中)或loop.create_task()(在 Python >=3.4.2 中)或asyncio.ensure_future()(在 Python >=3.4 中) .4),像这样:
class App:
... # Your other code here
def listener(self, conn, pid, channel, payload):
print(payload)
for ws in self.ws_list:
asyncio.create_task(ws.send(payload))
Run Code Online (Sandbox Code Playgroud)
请注意asyncio.create_task()(和其他上述函数)将立即返回,并且不会等待任务完成。任务将被安排在await其他地方一个或多个s之后运行。