我正在制作一个用于与 API 交互的 python 模块。我希望它很快,所以我选择使用 asyncio 和Aiohttp。我对异步编程很陌生,我不太确定如何为每个请求重用相同的会话。另外,我想让我的最终用户免去创建循环等的麻烦。我为我的基本客户端想出了这个类:
import asyncio
import aiohttp
class BaseClient:
API_BASE_URL = "dummyURL"
API_VERSION = 3
async def __aenter__(self):
self._session = aiohttp.ClientSession(raise_for_status=True)
return self
async def __aexit__(self, exc_type, exc, tb):
await self._session.close()
#remove the next line when aiohttp 4.0 is released
await asyncio.sleep(0.250)
async def _get(self, endpoint: str) -> None:
url = f"{self.API_BASE_URL}/{endpoint}/?v={self.API_VERSION}"
async with self._session.get(url) as resp:
json_body = await resp.json()
return json_body
async def list_forums(self):
endpoint = "forums"
return await self._get(endpoint)
async def …Run Code Online (Sandbox Code Playgroud)