当键可用时,python asyncio从字典中按键异步获取数据

lns*_*shi 5 python python-3.x async-await python-asyncio aiohttp

正如标题所说,我的用例是这样的:

我有一个 aiohttp 服务器,它接受来自客户端的请求,当我收到请求时,我为它生成一个唯一的请求 ID,然后我将{req_id: req_pyaload}dict发送给一些工作人员(工作人员不在 python 中,因此在另一个进程中运行),当工人完成的工作,我回来的响应,并把它们放到一个结果字典是这样的:{req_id_1: res_1, req_id_2: res_2}

然后我希望我的 aiohttp 服务器处理程序await在上面result dict,所以当特定的响应变得可用时(通过 req_id)它可以将它发送回来。

下面的示例代码,我建立以尽量模拟过程中,却被困在执行协程async def fetch_correct_res(req_id)异步/ unblockly获取由正确的响应req_id

import random
import asyncio
import shortuuid

n_tests = 1000

idxs = list(range(n_tests))

req_ids = []
for _ in range(n_tests):
    req_ids.append(shortuuid.uuid())

res_dict = {}

async def fetch_correct_res(req_id):
    pass

async def handler(req):
    res = await fetch_correct_res(req)
    assert req == res, "the correct res for the req should exactly be the req itself."
    print("got correct res for req: {}".format(req))

async def randomly_put_res_to_res_dict():
    for _ in range(n_tests):
        random_idx = random.choice(idxs)
        await asyncio.sleep(random_idx / 1000)
        res_dict[req_ids[random_idx]] = req_ids[random_idx]
        print("req: {} is back".format(req_ids[random_idx]))
Run Code Online (Sandbox Code Playgroud)

所以:

  1. 是否可以使此解决方案起作用?如何?

  2. 如果上述解决方案是不可能的,那么对于这个使用 asyncio 的用例,正确的解决方案应该是什么?

非常感谢。


我现在能想到的唯一方法是:asyncio.Queue用预先分配的 id预先创建一些,然后为每个传入的请求分配一个队列,所以处理程序就await在这个队列上,当响应回来时我仅将它放入这个预先分配的队列中,在请求完成后,我收集队列以将其用于下一个传入请求。不是很优雅,但会解决问题。

小智 5

看看下面的示例实现是否满足您的需求

基本上你想以异步方式用你的响应(无法预测顺序)响应请求(id)

因此,在处理请求时,用{request_id: {'event':<async.Event>, 'result': <result>}}and awaiton填充字典asyncio.Event.wait(),一旦收到响应,就发出信号通知事件,该事件asyncio.Event.set()将释放等待,然后根据请求 id 从字典中获取响应

我稍微修改了你的代码,用请求 id 预先填充字典,并打开await直到asyncio.Event.wait()信号来自响应

import random
import asyncio
import shortuuid

n_tests = 10

idxs = list(range(n_tests))

req_ids = []
for _ in range(n_tests):
    req_ids.append(shortuuid.uuid())

res_dict = {}

async def fetch_correct_res(req_id, event):
  await event.wait()
  res = res_dict[req_id]['result']
  return res

async def handler(req, loop):
      print("incoming request id: {}".format(req))
      event = asyncio.Event()
      data = {req :{}}
      res_dict.update(data)
      res_dict[req]['event']=event
      res_dict[req]['result']='pending'
      res = await fetch_correct_res(req, event)
      assert req == res, "the correct res for the req should exactly be the req itself."
      print("got correct res for req: {}".format(req))

async def randomly_put_res_to_res_dict():
    random.shuffle(req_ids)
    for i in req_ids:
        await asyncio.sleep(random.randrange(2,4))
        print("req: {} is back".format(i))
        if res_dict.get(i) is not None:
          event = res_dict[i]['event']
          res_dict[i]['result'] = i
          event.set()  

loop = asyncio.get_event_loop()
tasks = asyncio.gather(handler(req_ids[0], loop),
          handler(req_ids[1], loop),
          handler(req_ids[2], loop),
          handler(req_ids[3], loop),
          randomly_put_res_to_res_dict())
loop.run_until_complete(tasks)
loop.close()
Run Code Online (Sandbox Code Playgroud)

上述代码的示例响应

incoming request id: NDhvBPqMiRbteFD5WqiLFE
incoming request id: fpmk8yC3iQcgHAJBKqe2zh
incoming request id: M7eX7qeVQfWCCBnP4FbRtK
incoming request id: v2hAfcCEhRPUDUjCabk45N
req: VeyvAEX7YGgRZDHqa2UGYc is back
req: M7eX7qeVQfWCCBnP4FbRtK is back
got correct res for req: M7eX7qeVQfWCCBnP4FbRtK
req: pVvYoyAzvK8VYaHfrFA9SB is back
req: soP8NDxeQKYjgeT7pa3wtG is back
req: j3rcg5Lp59pQXuvdjCAyZe is back
req: NDhvBPqMiRbteFD5WqiLFE is back
got correct res for req: NDhvBPqMiRbteFD5WqiLFE
req: v2hAfcCEhRPUDUjCabk45N is back
got correct res for req: v2hAfcCEhRPUDUjCabk45N
req: porzHqMqV8SAuttteHRwNL is back
req: trVVqZrUpsW3tfjQajJfb7 is back
req: fpmk8yC3iQcgHAJBKqe2zh is back
got correct res for req: fpmk8yC3iQcgHAJBKqe2zh
Run Code Online (Sandbox Code Playgroud)