我刚刚开始使用Python3.4中的asyncio库,并编写了一个小程序,试图同时获取50个网页.该程序在几百个请求之后以"太多打开文件"异常爆炸.
我认为我的fetch方法用'response.read_and_close()'方法调用关闭连接.
有什么想法在这里发生了什么?我是以正确的方式解决这个问题吗?
import asyncio
import aiohttp
@asyncio.coroutine
def fetch(url):
response = yield from aiohttp.request('GET', url)
response = yield from response.read_and_close()
return response.decode('utf-8')
@asyncio.coroutine
def print_page(url):
page = yield from fetch(url)
# print(page)
@asyncio.coroutine
def process_batch_of_urls(round, urls):
print("Round starting: %d" % round)
coros = []
for url in urls:
coros.append(asyncio.Task(print_page(url)))
yield from asyncio.gather(*coros)
print("Round finished: %d" % round)
@asyncio.coroutine
def process_all():
api_url = 'https://google.com'
for i in range(10):
urls = []
for url in range(50):
urls.append(api_url)
yield from process_batch_of_urls(i, urls) …Run Code Online (Sandbox Code Playgroud) 我找不到aiohttp与登录页面结合的工作代码.目标很简单:使用用户名和密码进行基于表单的身份验证,我希望在随后的aiohttp async fetch调用中使用该cookie.
似乎整个Session概念在版本之间的aiohttp中发生了变化,所以我很好奇如何在最新版本中实现它.我不确定如何获取cookie一次,然后在异步事件中使用它.
我真的很想看到一个完整的例子,因为不幸的是我无法使用我在任何地方找到的片段.
我想这可能是一个开始,但我不确定,我当然不知道如何将所有内容连接到它(我还需要一个aiohttp.TCPConnector吗?)
http://aiohttp.readthedocs.org/en/latest/ client_reference.html#aiohttp.client.ClientSession
使用mechanize在Python 2中我的非异步版本的示例(虽然我自然地使用Python 3进行asyncio等):
import mechanize
import urllib
class MyClass()
def __init__(self):
self.data = {'username' : 'me', 'password' : 'pw'}
self.login_url = 'http://example.com/login'
self.login()
def call(self, url):
request2 = mechanize.Request(url)
self.cookie_jar.add_cookie_header(request2)
response2 = mechanize.urlopen(request2).read()
return response2
def login(self):
request = mechanize.Request(self.login_url)
# 'username' and 'password' keys are actually the name of the <input>
logInfo = urllib.urlencode({'username' : self.data['username'],
'password' : self.data['password']})
response = mechanize.urlopen(request, data = logInfo) …Run Code Online (Sandbox Code Playgroud) 我正在尝试为以下异步编写pytest,等待方法,但我无处可去.
class UserDb(object):
async def add_user_info(self,userInfo):
return await self.post_route(route='users',json=userInfo)
async def post_route(self,route=None,json=None,params=None):
uri = self.uri + route if route else self.uri
async with self.client.post(uri,json=json,params=params) as resp:
assert resp.status == 200
return await resp.json()
Run Code Online (Sandbox Code Playgroud)
有人可以帮我弄这个吗?TIA
我有一些使用 aiohttp 的应用程序。
我将 POST 请求发送到适当的端点,例如:
POST mysite.com/someendpoind/
Run Code Online (Sandbox Code Playgroud)
数据类似于:
{"param1": "value1", "param2": "value2", ..., "paramn": None}
Run Code Online (Sandbox Code Playgroud)
然后在后端,我想在这个请求中添加一些额外的条件:
data = await request.json()
data["additional_conditional"] = True
Run Code Online (Sandbox Code Playgroud)
但request.json()失败并出现错误:
[ERROR] Error handling request
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/aiohttp/web_protocol.py", line 422, in start
resp = yield from self._request_handler(request)
File "/usr/local/lib/python3.5/dist-packages/aiohttp/web.py", line 306, in _handle
resp = yield from handler(request)
File "/usr/local/lib/python3.5/dist-packages/aiohttp_session/__init__.py", line 129, in middleware
response = yield from handler(request)
File "/opt/bikeamp/auth/__init__.py", line 57, in wrapped
return (yield from f(request, user)) …Run Code Online (Sandbox Code Playgroud) aiohttp入门文档提供了以下客户端示例:
async with aiohttp.ClientSession() as session:
async with session.get('https://api.github.com/events') as resp:
print(resp.status)
print(await resp.text())
Run Code Online (Sandbox Code Playgroud)
我无法理解何时response.status可用。我的理解是协程在await response.read()生产线上释放了控制权。在等待响应恢复之前如何访问状态?
我已经使用 aiohttp 编写了一个简单的 HTTP 客户端,我正在尝试通过修补aiohttp.ClientSession和aiohttp.ClientResponse. 但是,看起来unittest.mock.patch装饰器似乎不尊重我的异步代码。猜测,我会说这是某种命名空间不匹配。
这是一个最小的例子:
from aiohttp import ClientSession
async def is_ok(url:str) -> bool:
async with ClientSession() as session:
async with session.request("GET", url) as response:
return (response.status == 200)
Run Code Online (Sandbox Code Playgroud)
我正在使用异步装饰器进行测试,如本答案中所述。所以这是我尝试的测试:
import unittest
from unittest.mock import MagicMock, patch
from aiohttp import ClientResponse
from my.original.module import is_ok
class TestClient(unittest.TestCase):
@async_test
@patch("my.original.module.ClientSession", spec=True)
async def test_client(self, mock_client):
mock_response = MagicMock(spec=ClientResponse)
mock_response.status = 200
async def _mock_request(*args, **kwargs):
return mock_response
mock_client.request = mock_response
status …Run Code Online (Sandbox Code Playgroud) 作为一项学习练习,我试图修改aiohttp的快速入门示例,以使用单个ClientSession提取多个URL(文档建议通常应为每个应用程序创建一个ClientSession)。
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main(url, session):
print(f"Starting '{url}'")
html = await fetch(session, url)
print(f"'{url}' done")
urls = (
"https://python.org",
"https://twitter.com",
"https://tumblr.com",
"https://example.com",
"https://github.com",
)
loop = asyncio.get_event_loop()
session = aiohttp.ClientSession()
loop.run_until_complete(asyncio.gather(
*(loop.create_task(main(url, session)) for url in urls)
))
# session.close() <- this doesn't make a difference
Run Code Online (Sandbox Code Playgroud)
但是,在协程之外创建ClientSession显然不是可行的方法:
?python 1_async.py 1_async.py:30:用户警告:在协程之外创建客户端会话是一个非常危险的想法 会话= aiohttp.ClientSession() 在协程之外创建客户端会话 client_session: 开始'https://python.org' 开始“ https://twitter.com” 开始'https://tumblr.com' 开始“ https://example.com” 开始'https://github.com' 'https://twitter.com'完成 “ https://example.com”完成 'https://github.com'完成 …
我正在尝试制作一个可以存活一天、一周或更长时间的应用程序。在应用程序的生命周期中,它会向不同的 API 发出请求。其中一些 api 可能需要登录,因此我可以随时访问 cookie,这一点很重要。
所以我需要的是一个不同的 API 可以在不阻塞应用程序的情况下使用的文件。
我是异步编程(asyncio/aiohttp)的新手,我见过的示例展示了如何从 url 列表发出大量请求,但这不是我需要的。
我的代码的问题是,我得到 ClientSession is closed 错误或未关闭的 ClientSession 警告。
import asyncio # only here for debugging purposes
import aiohttp
USER_AGENT = 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:61.0) Gecko/20100101 Firefox/61.1'
def default_headers():
header = {
'User-Agent': USER_AGENT
}
return header
class WebSession(object):
session = None
@classmethod
def create(cls):
cls.session = aiohttp.ClientSession()
return cls.session
@classmethod
def close(cls):
if cls.session is not None:
cls.session.close()
async def request(method, url, **kwargs):
if kwargs.get('headers', None) is …Run Code Online (Sandbox Code Playgroud) 我无法理解 aiohttp(以及一般的 asyncio)服务器实现没有提供一种方法来限制最大并发连接数限制(接受的套接字数或正在运行的请求处理程序数)的原因。(https://github.com/aio-libs/aiohttp/issues/675)。如果没有这个限制,很容易耗尽内存和/或文件描述符。
同时,aiohttp客户端默认限制并发请求数为100(https://docs.aiohttp.org/en/stable/client_advanced.html#limiting-connection-pool-size),aiojobs限制运行任务数和挂起任务列表的大小,nginx 有 worker_connections 限制,任何同步框架在设计上都受到工作线程数的限制。
虽然 aiohttp 可以处理很多并发请求,但这个数量仍然有限。aiojobs 上的文档说“调度程序隐含了并发作业数量的限制(默认为 100)。......它通过同时运行十亿个作业来防止程序溢出”。而且,我们仍然可以愉快地产生“十亿”(好吧,直到我们用完资源)aiohttp 处理程序。
所以问题是,为什么它会以这样的方式实施?我错过了一些重要的细节吗?我认为我们可以使用 Semafor 以某种方式暂停请求处理程序,但与 nginx 相比,aiohttp 仍然接受套接字并生成协程。同样在 nginx 后面部署时,worker_connections 和 aiohttp 所需的限制数量肯定会有所不同。(因为 nginx 也可能提供静态文件)
全部!
我需要向 Web 服务发出大约 10,000 个请求,我期望 JSON 响应。由于请求是相互独立的,我想并行运行它们。我认为aiohttp可以帮助我。我写了以下代码:
import asyncio
import aiohttp
async def execute_module(session: aiohttp.ClientSession, module_id: str,
post_body: dict) -> dict:
headers = {
'Content-Type': r'application/json',
'Authorization': fr'Bearer {TOKEN}',
}
async with session.post(
fr'{URL}/{module_id}/steps/execute',
headers=headers,
json=post_body,
) as response:
return await response.json()
async def execute_all(campaign_ids, post_body):
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*[
execute_module(session, campaign_id, post_body)
for campaign_id in campaign_ids
])
campaign_ids = ['101', '102', '103'] * 400
post_body = {'inputs': [{"name": "one", "value": 1}]}
print(asyncio.run(execute_all(campaign_ids, …Run Code Online (Sandbox Code Playgroud) aiohttp ×10
python ×7
python-3.x ×2
async-await ×1
http-post ×1
json ×1
patch ×1
pytest ×1
python-3.6 ×1
request ×1
testing ×1