jle*_*ham 10 python-3.x python-trio
我编写了一个脚本,它使用托儿所和 asks 模块循环并根据循环变量调用 API。我收到回复,但不知道如何像使用 asyncio 一样返回数据。
我还有一个关于将 API 限制为每秒 5 个的问题。
from datetime import datetime
import asks
import time
import trio
asks.init("trio")
s = asks.Session(connections=4)
async def main():
start_time = time.time()
api_key = 'API-KEY'
org_id = 'ORG-ID'
networkIds = ['id1','id2','idn']
url = 'https://api.meraki.com/api/v0/networks/{0}/airMarshal?timespan=3600'
headers = {'X-Cisco-Meraki-API-Key': api_key, 'Content-Type': 'application/json'}
async with trio.open_nursery() as nursery:
for i in networkIds:
nursery.start_soon(fetch, url.format(i), headers)
print("Total time:", time.time() - start_time)
async def fetch(url, headers):
print("Start: ", url)
response = await s.get(url, headers=headers)
print("Finished: ", url, len(response.content), response.status_code)
if __name__ == "__main__":
trio.run(main)
Run Code Online (Sandbox Code Playgroud)
当我运行nursery.start_soon(fetch...) 时,我正在fetch 中打印数据,但是如何返回数据?我没有看到任何类似于 asyncio.gather(*tasks) 函数的东西。
此外,我可以将会话数限制为 1-4,这有助于低于每秒 5 个 API 的限制,但想知道是否有内置方法来确保在任何给定的秒内调用的 API 不超过 5 个?
返回数据:将 networkID 和 dict 传递给fetch任务:
async def main():
…
results = {}
async with trio.open_nursery() as nursery:
for i in networkIds:
nursery.start_soon(fetch, url.format(i), headers, results, i)
## results are available here
async def fetch(url, headers, results, i):
print("Start: ", url)
response = await s.get(url, headers=headers)
print("Finished: ", url, len(response.content), response.status_code)
results[i] = response
Run Code Online (Sandbox Code Playgroud)
或者,创建一个trio.Queue你put的结果;然后您的主要任务可以从队列中读取结果。
API 限制:按照以下方式创建trio.Queue(10)并启动任务:
async def limiter(queue):
while True:
await trio.sleep(0.2)
await queue.put(None)
Run Code Online (Sandbox Code Playgroud)
将该队列fetch作为另一个参数传递给,并await limit_queue.get()在每次 API 调用之前调用。
从技术上讲,trio.Queue它已在 trio 0.9 中弃用。它已被替换为trio.open_memory_channel.
简短的例子:
sender, receiver = trio.open_memory_channel(len(networkIds)
async with trio.open_nursery() as nursery:
for i in networkIds:
nursery.start_soon(fetch, sender, url.format(i), headers)
async for value in receiver:
# Do your job here
pass
Run Code Online (Sandbox Code Playgroud)
在你的fetch函数中你应该在某个地方调用async sender.send(value)。
根据这个答案,您可以定义以下函数:
async def gather(*tasks):
async def collect(index, task, results):
task_func, *task_args = task
results[index] = await task_func(*task_args)
results = {}
async with trio.open_nursery() as nursery:
for index, task in enumerate(tasks):
nursery.start_soon(collect, index, task, results)
return [results[i] for i in range(len(tasks))]
Run Code Online (Sandbox Code Playgroud)
然后,您可以通过简单地修补 trio(添加收集函数)来以与 asyncio 完全相同的方式使用 trio:
import trio
trio.gather = gather
Run Code Online (Sandbox Code Playgroud)
这是一个实际的例子:
async def child(x):
print(f"Child sleeping {x}")
await trio.sleep(x)
return 2*x
async def parent():
tasks = [(child, t) for t in range(3)]
return await trio.gather(*tasks)
print("results:", trio.run(parent))
Run Code Online (Sandbox Code Playgroud)
当我运行 Nursery.start_soon(fetch...) 时,我在 fetch 中打印数据,但如何返回数据?我没有看到任何类似于 asyncio.gather(*tasks) 函数的内容。
你问了两个不同的问题,所以我只回答这一个问题。马蒂亚斯已经回答了你的另一个问题。
当您调用 时start_soon(),您是在要求 Trio 在后台运行该任务,然后继续。这就是 Trio 能够fetch()同时运行多次的原因。但由于 Trio 不断运行,因此无法像 Python 函数通常那样“返回”结果。它甚至会回到哪里?
您可以使用队列让fetch()任务将结果发送到另一个任务以进行额外处理。
创建队列:
response_queue = trio.Queue(capacity=len(networkIds))
Run Code Online (Sandbox Code Playgroud)
当您开始获取任务时,将队列作为参数传递,并在完成后将哨兵发送到队列:
async with trio.open_nursery() as nursery:
for i in networkIds:
nursery.start_soon(fetch, url.format(i), headers, response_queue)
await response_queue.put(None)
Run Code Online (Sandbox Code Playgroud)
下载 URL 后,将响应放入队列中:
async def fetch(url, headers, response_queue):
print("Start: ", url)
response = await s.get(url, headers=headers)
# Add responses to queue
await response_queue.put(response)
print("Finished: ", url, len(response.content), response.status_code)
Run Code Online (Sandbox Code Playgroud)
通过上述更改,您的获取任务会将响应放入队列中。现在您需要从队列中读取响应,以便可以处理它们。您可以添加一个新函数来执行此操作:
async def process(response_queue):
async for response in response_queue:
if response is None:
break
# Do whatever processing you want here.
Run Code Online (Sandbox Code Playgroud)
您应该在启动任何获取任务之前将此处理函数作为后台任务启动,以便它在收到响应后立即处理响应。
请阅读Trio 文档的“任务之间的同步和通信”部分来了解更多信息。
| 归档时间: |
|
| 查看次数: |
1863 次 |
| 最近记录: |