AIORedis和PUB / SUB不是asnyc

Glu*_*eon 2 python asynchronous python-asyncio

我使用aioredis编写了异步服务,该服务将在某个频道上侦听并以异步方式运行一些命令。

基本上,我从示例页面中获取了一个代码来编写一个小型测试应用程序,并删除了不必要的部分:

import asyncio
import aioredis

async def reader(ch):
    while (await ch.wait_message()):
        msg = await ch.get_json()
        print('Got Message:', msg)
        i = int(msg['sleep_for'])
        print('Sleep for {}'.format(i))
        await asyncio.sleep(i)
        print('End sleep')


async def main():
    sub = await aioredis.create_redis(('localhost', 6379))
    res = await sub.subscribe('chan:1')
    ch1 = res[0]
    tsk = await reader(ch1)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Run Code Online (Sandbox Code Playgroud)

还有另一个测试应用程序,它发布带有sleep_for字段的json blob ,然后在订阅者应用程序中使用该字段reader使用sleep语句来模拟协程内部的某些工作。

我希望“睡眠”以“并行”方式运行,但实际上它们以同步的方式出现在屏幕上,一个接一个。

我的猜测是,只要打到await ch.get_json(..)(或什至await ch.wait_message())行,我就应该能够处理下一条消息。在实践中,它像同步代码一样运行。我哪里错了?这可以使用连接池来处理,但这意味着存在一些不异步的问题,也不知道确切的含义。

Jas*_*ohi 5

我的猜测是,只要等待a。ch.get_json(..)(甚至是a.ch.wait_message())这一行,我就应该能够处理下一条消息。

async/await语法不是这样工作的。每次await在协同程序中命中a 时,该协同程序都会被“暂停”,从而控制了所谓的协同程序。如果正在休眠,它不会自动处理下一条消息。

您应该使用的ensure_future是在单独的协程中处理每个消息:

import asyncio
import aioredis

async def handle_msg(msg):
    print('Got Message:', msg)
    i = int(msg['sleep_for'])
    print('Sleep for {}'.format(i))
    await asyncio.sleep(i)
    print('End sleep')

async def reader(ch):
    while (await ch.wait_message()):
        msg = await ch.get_json()
        asyncio.ensure_future(handle_msg(msg))

async def main():
    sub = await aioredis.create_redis(('localhost', 6379))
    res = await sub.subscribe('chan:1')
    ch1 = res[0]
    tsk = await reader(ch1)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close() 
Run Code Online (Sandbox Code Playgroud)