lns*_*shi 5 python python-3.x async-await python-asyncio aiohttp
正如标题所说,我的用例是这样的:
我有一个 aiohttp 服务器,它接受来自客户端的请求,当我收到请求时,我为它生成一个唯一的请求 ID,然后我将{req_id: req_pyaload}dict发送给一些工作人员(工作人员不在 python 中,因此在另一个进程中运行),当工人完成的工作,我回来的响应,并把它们放到一个结果字典是这样的:{req_id_1: res_1, req_id_2: res_2}。
然后我希望我的 aiohttp 服务器处理程序await在上面result dict,所以当特定的响应变得可用时(通过 req_id)它可以将它发送回来。
下面的示例代码,我建立以尽量模拟过程中,却被困在执行协程async def fetch_correct_res(req_id)应异步/ unblockly获取由正确的响应req_id。
import random
import asyncio
import shortuuid
n_tests = 1000
idxs = list(range(n_tests))
req_ids = []
for _ in range(n_tests):
req_ids.append(shortuuid.uuid())
res_dict = {}
async def fetch_correct_res(req_id):
pass
async def handler(req):
res = await fetch_correct_res(req)
assert req == res, "the correct res for the req should exactly be the req itself."
print("got correct res for req: {}".format(req))
async def randomly_put_res_to_res_dict():
for _ in range(n_tests):
random_idx = random.choice(idxs)
await asyncio.sleep(random_idx / 1000)
res_dict[req_ids[random_idx]] = req_ids[random_idx]
print("req: {} is back".format(req_ids[random_idx]))
Run Code Online (Sandbox Code Playgroud)
所以:
是否可以使此解决方案起作用?如何?
如果上述解决方案是不可能的,那么对于这个使用 asyncio 的用例,正确的解决方案应该是什么?
非常感谢。
我现在能想到的唯一方法是:asyncio.Queue用预先分配的 id预先创建一些,然后为每个传入的请求分配一个队列,所以处理程序就await在这个队列上,当响应回来时我仅将它放入这个预先分配的队列中,在请求完成后,我收集队列以将其用于下一个传入请求。不是很优雅,但会解决问题。
小智 5
看看下面的示例实现是否满足您的需求
基本上你想以异步方式用你的响应(无法预测顺序)响应请求(id)
因此,在处理请求时,用{request_id: {'event':<async.Event>, 'result': <result>}}and awaiton填充字典asyncio.Event.wait(),一旦收到响应,就发出信号通知事件,该事件asyncio.Event.set()将释放等待,然后根据请求 id 从字典中获取响应
我稍微修改了你的代码,用请求 id 预先填充字典,并打开await直到asyncio.Event.wait()信号来自响应
import random
import asyncio
import shortuuid
n_tests = 10
idxs = list(range(n_tests))
req_ids = []
for _ in range(n_tests):
req_ids.append(shortuuid.uuid())
res_dict = {}
async def fetch_correct_res(req_id, event):
await event.wait()
res = res_dict[req_id]['result']
return res
async def handler(req, loop):
print("incoming request id: {}".format(req))
event = asyncio.Event()
data = {req :{}}
res_dict.update(data)
res_dict[req]['event']=event
res_dict[req]['result']='pending'
res = await fetch_correct_res(req, event)
assert req == res, "the correct res for the req should exactly be the req itself."
print("got correct res for req: {}".format(req))
async def randomly_put_res_to_res_dict():
random.shuffle(req_ids)
for i in req_ids:
await asyncio.sleep(random.randrange(2,4))
print("req: {} is back".format(i))
if res_dict.get(i) is not None:
event = res_dict[i]['event']
res_dict[i]['result'] = i
event.set()
loop = asyncio.get_event_loop()
tasks = asyncio.gather(handler(req_ids[0], loop),
handler(req_ids[1], loop),
handler(req_ids[2], loop),
handler(req_ids[3], loop),
randomly_put_res_to_res_dict())
loop.run_until_complete(tasks)
loop.close()
Run Code Online (Sandbox Code Playgroud)
上述代码的示例响应
incoming request id: NDhvBPqMiRbteFD5WqiLFE
incoming request id: fpmk8yC3iQcgHAJBKqe2zh
incoming request id: M7eX7qeVQfWCCBnP4FbRtK
incoming request id: v2hAfcCEhRPUDUjCabk45N
req: VeyvAEX7YGgRZDHqa2UGYc is back
req: M7eX7qeVQfWCCBnP4FbRtK is back
got correct res for req: M7eX7qeVQfWCCBnP4FbRtK
req: pVvYoyAzvK8VYaHfrFA9SB is back
req: soP8NDxeQKYjgeT7pa3wtG is back
req: j3rcg5Lp59pQXuvdjCAyZe is back
req: NDhvBPqMiRbteFD5WqiLFE is back
got correct res for req: NDhvBPqMiRbteFD5WqiLFE
req: v2hAfcCEhRPUDUjCabk45N is back
got correct res for req: v2hAfcCEhRPUDUjCabk45N
req: porzHqMqV8SAuttteHRwNL is back
req: trVVqZrUpsW3tfjQajJfb7 is back
req: fpmk8yC3iQcgHAJBKqe2zh is back
got correct res for req: fpmk8yC3iQcgHAJBKqe2zh
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1064 次 |
| 最近记录: |