我正在使用基于 aiohttp 的第三方 API 客户端库。该库没有将其客户端实现为上下文管理器,但只要我始终aiohttp.ClientSession手动关闭(对吗?),这应该没有问题。
问题是,即使在仔细执行此操作之后,我仍然收到警告,Unclosed client session但没有任何信息来自它的来源。我正在使用 捕获警告回溯pytest -W error::ResourceWarning,但结果很简单
.Exception ignored in: <bound method ClientSession.__del__ of <aiohttp.client.ClientSession object at 0x10fc15ba8>>
Traceback (most recent call last):
File "<PROJECT>/venv/lib/python3.6/site-packages/aiohttp/client.py", line 211, in __del__
**kwargs)
ResourceWarning: Unclosed client session <aiohttp.client.ClientSession object at 0x10fc15ba8>
Run Code Online (Sandbox Code Playgroud)
即单帧的回溯。
有人可以告诉我我在这里做错了什么以及如何获得有关该警告的更多信息吗?
我有以下代码
session = aiohttp.ClientSession()
async def fetch(session, url):
while True:
try:
async with session.get(url) as response:
assert response.status == 200
return await response.json()
except Exception as error:
print(error)
class FPL():
async def get_player_summaries(self, player_ids=[], return_json=False):
tasks = [asyncio.ensure_future(
fetch(session, API_URLS["player"].format(player_id)))
for player_id in player_ids]
player_summaries = await asyncio.gather(*tasks)
if return_json:
return player_summaries
return [PlayerSummary(player_summary)
for player_summary in player_summaries]
async def get_points_against(self):
players = await self.get_players(return_json=True)
player_ids = [player["id"] for player in players]
player_summaries = await self.get_player_summaries(
player_ids, return_json=True)
points_against = …Run Code Online (Sandbox Code Playgroud) 我尝试使用aioresponses来模拟 ClientSession POST 请求。但我无法连接到主机服务器。我只是使用 aiohttp 客户端而不是服务器。另外,我不想将假会话作为参数传递给测试函数。有没有办法做到这一点 ?
我正在尝试使用 python 中的异步从网站获取数据。作为示例,我使用了此代码(在“更好的协程示例”下):https ://www.blog.pythonlibrary.org/2016/07/26/python-3-an-intro-to-asyncio/
现在这工作正常,但它将二进制块写入文件,而我不希望它在文件中。我想要直接得到结果数据。但我目前有一个协程对象列表,我无法从中获取数据。
代码:
# -*- coding: utf-8 -*-
import aiohttp
import asyncio
import async_timeout
async def fetch(session, url):
with async_timeout.timeout(10):
async with session.get(url) as response:
return await response.text()
async def main(loop, urls):
async with aiohttp.ClientSession(loop=loop) as session:
tasks = [fetch(session, url) for url in urls]
await asyncio.gather(*tasks)
return tasks
# time normal way of retrieval
if __name__ == '__main__':
urls = [a list of urls..]
loop = asyncio.get_event_loop()
details_async = loop.run_until_complete(main(loop, urls))
Run Code Online (Sandbox Code Playgroud)
谢谢
我想创建一个从网站异步下载的功能。我需要将下载结果连接到输入参数,以便我可以在下载后使用结果和参数。
我目前有以下内容:
async def download(session, url, var1, var2):
with async_timeout.timeout(10):
async with session.get(url) as response:
return await (response.read(), url, var1, var2)
async def loop_download(loop, urls, var1s, var2s):
async with aiohttp.ClientSession(loop=loop) as session:
tasks = [download(session, url, var1, var2) for url, var1, var2 in zip(urls, var1s, var2s)]
results = await asyncio.gather(*tasks)
return results
loop = asyncio.get_event_loop()
results = loop.run_until_complete(loop_download(loop, urls, var1s, var2s))
Run Code Online (Sandbox Code Playgroud)
然而这会返回一个错误:
TypeError: object tuple can't be used in 'await' expression
Run Code Online (Sandbox Code Playgroud)
如何将一些输入数据(例如网址)加入到结果中,以便我可以使用它进行进一步分析?
我的代码出现了一些问题。我有一个aiohttp客户端会话,它通过请求与网站进行通信。
问题是,当我长时间运行代码时,我开始收到一些错误,例如ClientResponseError, ServerDisconnectedError, Error 101。所以我正在阅读文档,我看到了这个:
release()
将连接释放回连接器。
底层套接字未关闭,如果连接超时(默认为 30 秒)未过期,则稍后可以重用连接。
但我不明白。有人可以简单地解释一下吗?它会解决我的问题吗?
session = aiohttp.ClientSession(cookie_jar=cookiejar)
while True:
await session.post('https://anywhere.com', data={'{}': ''})
Run Code Online (Sandbox Code Playgroud) 第一次尝试asyncio并且aiohttp。我有以下urls从MySQL数据库获取GET请求的代码。获取响应并将其推送到MySQL数据库。
if __name__ == "__main__":
database_name = 'db_name'
company_name = 'company_name'
my_db = Db(database=database_name) # wrapper class for mysql.connector
urls_dict = my_db.get_rest_api_urls_for_specific_company(company_name=company_name)
update_id = my_db.get_updateid()
my_db.get_connection(dictionary=True)
for url in urls_dict:
url_id = url['id']
url = url['url']
table_name = my_db.make_sql_table_name_by_url(url)
insert_query = my_db.get_sql_for_insert(table_name)
r = requests.get(url=url).json() # make the request
args = [json.dumps(r), update_id, url_id]
my_db.db_execute_one(insert_query, args, close_conn=False)
my_db.close_conn()
Run Code Online (Sandbox Code Playgroud)
这工作正常,但要加快速度我该如何运行它asynchronously?
import time
import asyncio
import aiohttp
async def is_name_available(s, name):
async with s.get("https://twitter.com/%s" % name) as res:
if res.raise_for_status == 404:
print('%s is available!' % name)
return name
async def check_all_names(names):
async with aiohttp.ClientSession(raise_for_status=True) as s:
tasks = []
for name in names:
task = asyncio.create_task(is_name_available(s, name))
tasks.append(task)
return await asyncio.gather(*tasks)
def main():
with open('names.txt') as in_file, open('available.txt', 'w') as out_file:
names = [name.strip() for name in in_file]
start_time = time.time()
results = asyncio.get_event_loop().run_until_complete(check_all_names(names))
results = [i for i in …Run Code Online (Sandbox Code Playgroud)