继续得到'asyncio.exceptions.TimeoutError'

ah2*_*ise 5 python python-asyncio aiohttp

下面的示例仅将 1 个元素添加到事件循环中,并使用变量asins

asyncio.exceptions.TimeoutErrorasins下面的参数为 180 个元素或更长时,我收到错误。

如果我创建包含这 180 个元素中的任何一个的列表,我会得到成功的响应,这告诉我下面的问题与 API 无关。

谁能告诉我如何解决这个问题?谢谢你!

import asyncio
import aiohttp
import sys
import pandas as pd

def create_params(asins_set):
    params = []
    for asin in asins_set:
        param = {
            'api_key': '...',
            'type': 'product',
            'amazon_domain': 'amazon.com',
            'asin': asin,
        }
        params.append(param)
    return params

if sys.version_info[0] == 3 and sys.version_info[1] >= 8 and sys.platform.startswith('win'):
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

# creates a list of tasks to add to the event loop at once
def get_tasks(session, params):
    tasks = []
    for param in params:
        tasks.append(session.get(
            'https://api.rainforestapi.com/request',
            params = param
        ))
    return tasks

results = []
async def get_suggested(params):
    async with aiohttp.ClientSession() as session:
        tasks = get_tasks(session, params)
        responses = await asyncio.gather(*tasks)
        for response in responses:
            results.append(await response.json())
        return results

def get_asin_titles(asins_set):
    params = create_params(asins_set)
    r = asyncio.run(get_suggested(params))
    asins_and_titles = dict()
    for result in r:
        if result['request_info']['success'] == True:
            asin = result['request_parameters']['asin']
            title = result['product']['title']
            asins_and_titles[asin] = title
    return asins_and_titles

asins = ['b07wp7q5bf']

final = get_asin_titles(asins)
print(final)
Run Code Online (Sandbox Code Playgroud)

Eug*_*nij 7

问题可能出在大量并发 http 连接上。这里你尝试一次运行所有 180 个 http 请求

responses = await asyncio.gather(*tasks)
Run Code Online (Sandbox Code Playgroud)

因此,您需要限制同时运行的 http 请求的数量。你可以使用asyncio.Semaphore来做到这一点

信号量管理一个内部计数器,该计数器在每次 acquire() 调用时递减,并在每次 release() 调用时递增。计数器永远不会低于零;当 acquire() 发现它为零时,它会阻塞,等待某个任务调用release()。

所以,你的代码将如下所示

...
sem = asyncio.Semaphore(10)

async def perform_request(session, param)
    async with sem:
        return await session.get(
            'https://api.rainforestapi.com/request',
            params = param
        )

def get_tasks(session, params):
    tasks = []
    for param in params:
        tasks.append(perform_request(session, param))
    return tasks

...
Run Code Online (Sandbox Code Playgroud)

现在,如果您运行responses = await asyncio.gather(*tasks),一次只会运行 10 个并发 http 请求。

您可以使用数字 10 来调整您的解决方案以适应 API 速率。如果使用,sem = asyncio.Semaphore(1)所有请求将按顺序一一执行。

也许对于这个 API 来说还是太快了。然后,您可以添加睡眠来进一步降低 RPS:

async def perform_request(session, param)
    async with sem:
        await asyncio.sleep(1)
        return await session.get(
            'https://api.rainforestapi.com/request',
            params = param
        )
Run Code Online (Sandbox Code Playgroud)