协同之外的Aiohttp ClientSession

Gus*_*rra 10 python multithreading python-asyncio aiohttp

我有一个REST API包装器,应该在交互式Python会话中运行.HTTP请求既可以通过自动后台线程(使用API​​包装器),也可以由最终用户通过交互式会话手动完成.我试图将所有HTTP请求管理从前一个新的每线程请求方法迁移到asyncio,但由于我无法在主线程中运行asyncio循环(它必须是免费的ad-hoc Python命令/请求),我写了以下内容在后台线程中运行它:

import aiohttp
import asyncio
from concurrent.futures import ThreadPoolExecutor

def start_thread_loop(pool=None):
    """Starts thread with running loop, bounding the loop to the thread"""
    def init_loop(loop):
        asyncio.set_event_loop(loop)  # bound loop to thread
        loop.run_forever()
    _pool = ThreadPoolExecutor() if pool is None else pool
    loop = asyncio.new_event_loop()
    future = _pool.submit(init_loop, loop)
    return future, loop

def send_to_loop(coro, loop):
    """Wraps couroutine in Task object and sends it to given loop"""
    return asyncio.run_coroutine_threadsafe(coro, loop=loop)
Run Code Online (Sandbox Code Playgroud)

实际的API包装器类似于以下内容:

class Foo:
    def __init__(self):
        _, self.loop = start_thread_loop()
        self.session = aiohttp.ClientSession(loop=self.loop)
        self.loop.set_debug(True)

    def send_url(self, url):
        async def _request(url):
            print('sending request')
            async with self.session.get(url) as resp:
                print(resp.status)
        return send_to_loop(_request(url), self.loop)
Run Code Online (Sandbox Code Playgroud)

然而,aiohttp强烈建议不要做一个ClientSession 协程之外,并开启asyncio调试模式初始化之前ClientSession提出了一个RuntimeError.因此,我尝试使用稍微不同的版本asycio.Queue,以避免ClientSession在协同程序内部:

class Bar:

    def __init__(self):
        _, self.loop = start_thread_loop()
        self.q = asyncio.Queue(loop=self.loop)
        self.status = send_to_loop(self.main(), loop=self.loop)

    async def main(self):
        async with aiohttp.ClientSession(loop=self.loop) as session:
            while True:
                url = await self.q.get()
                print('sending request')
                asyncio.ensure_future(self._process_url(url, session), loop=self.loop)

    def send_url(self, url):
        send_to_loop(self.q.put(url), loop=self.loop)

    @staticmethod
    async def _process_url(url, session):
        async with session.get(url) as resp:
            print(resp.status)
Run Code Online (Sandbox Code Playgroud)

但是,这种方法更加复杂/冗长,我真的不明白它是否真的有必要.

问题:

  1. 为什么ClientSession在协程外面开始出现问题?
  2. 队列方法更好/更安全吗?如果是这样,为什么?
  3. 我在后台线程中启动循环的方法有什么问题吗?

ami*_*che 4

为什么在协程之外启动 ClientSession 会出现问题?

这就是 aiohttp 的构建方式,理论上应该可以在循环之外初始化某种客户端会话,即。在协程之外,但这不是 aiohttp 的构建方式。AFAIU 在引入此警告的问题中,这是因为 a) 很难测试 b) 它很容易出错

队列方法更好/更安全吗?如果是这样,为什么?

我不明白你想达到什么目的,所以我不知道如何回答。也许您遇到的问题是您尝试在ClientSession构造函数内部初始化 a 又名。__init__另一个班级的。在这种情况下,您应该通过创建一个辅助方法来解决该问题,该方法是一个协程,将完成类的初始化。这是使用异步代码时的已知模式。

我在后台线程内启动循环的方法有什么问题吗?

完全没问题。

  • 总之,只要循环**显式传递**到`ClientSession`构造函数,就不会引发警告,即使实例化发生在协程之外。使用“aiohttp”v2.2.0 运行问题中的代码不会引发任何警告。如果您将此信息添加到您的答案中,我将标记为已接受。再次感谢。 (3认同)
  • 请自己回答这个问题,我不确定是否完全了解情况。谢谢! (2认同)