将同步请求包装到asyncio(async/await)?

Ugu*_*gur 9 python-3.x async-await python-asyncio

我在Python 3.6中编写了一个工具,它向几个API(带有各种端点)发送请求,并收集它们的解析来解析并将它们保存在数据库中.

我使用的API客户端具有请求URL 的同步版本,例如他们使用的

urllib.request.Request('...

或者他们使用Kenneth Reitz的Requests图书馆.

由于我的API调用依赖于请求URL的同步版本,因此整个过程需要几分钟才能完成.

现在我想将我的API调用包装在async/await(asyncio)中.我正在使用python 3.6.

我发现的所有示例/教程都要我将同步URL调用/ requests更改为异步版本(例如aiohttp).由于我的代码依赖于我未编写的API客户端(我无法更改),因此我需要保持该代码不受影响.

那么有没有办法将我的同步请求(阻塞代码)包装在async/await中以使它们在事件循环中运行?

我是Python新手中的asyncio.这在NodeJS中是不费脑子的.但是我不能用Python来解决这个问题.

Cla*_*ude 7

解决方案是将同步代码包装在线程中并以此方式运行.我使用那个确切的系统来asyncio运行我的代码boto3(注意:如果运行<python3.6,则删除内联类型提示):

async def get(self, key: str) -> bytes:
    s3 = boto3.client("s3")
    loop = asyncio.get_event_loop()
    try:
        response: typing.Mapping = \
            await loop.run_in_executor(  # type: ignore
                None, functools.partial(
                    s3.get_object,
                    Bucket=self.bucket_name,
                    Key=key))
    except botocore.exceptions.ClientError as e:
        if e.response["Error"]["Code"] == "NoSuchKey":
            raise base.KeyNotFoundException(self, key) from e
        elif e.response["Error"]["Code"] == "AccessDenied":
            raise base.AccessDeniedException(self, key) from e
        else:
            raise
    return response["Body"].read()
Run Code Online (Sandbox Code Playgroud)

请注意,这将起作用,因为s3.get_object()代码中的大量时间花在等待I/O上,并且(通常)在等待I/O python发布GIL时(GIL是python中通常线程不是的原因)一个好主意).

第一个参数Nonerun_in_executor,我们在默认执行运行方式.这是一个线程池执行器,但它可以使事情更明确地在那里显式地分配线程池执行器.

请注意,在使用纯异步I/O的情况下,您可以轻松地同时打开数千个连接,使用线程池执行器意味着每个API的并发调用都需要一个单独的线程.一旦您的池中的线程用完,线程池将不会安排您的新调用,直到线程可用.显然你可以提高线程数,但这会占用内存; 不要指望能够超过几千.

另请参阅python ThreadPoolExecutor文档以获取解释以及如何在异步代码中包装同步调用的一些略有不同的代码.