给定 5 万个网站 url 的列表,我的任务是找出其中哪些是可用的/可访问的。这个想法只是向HEAD每个 URL发送一个请求并查看状态响应。从我听到一个异步方法是要去的地方,现在我使用的是asyncio用aiohttp。
我想出了以下代码,但速度非常糟糕。在我的 10 兆位连接上,1000 个 URL 大约需要 200 秒。我不知道期望的速度是多少,但我是 Python 异步编程的新手,所以我想我在某个地方走错了地方。如您所见,我已尝试将允许的同时连接数增加到 1000(从默认值 100 增加)以及 DNS 解析保留在缓存中的持续时间;都没有什么大的影响。环境有 Python 3.6 和 aiohttp3.5.4。
与问题无关的代码审查也受到赞赏。
import asyncio
import time
from socket import gaierror
from typing import List, Tuple
import aiohttp
from aiohttp.client_exceptions import TooManyRedirects
# Using a non-default user-agent seems to avoid lots of 403 (Forbidden) errors
HEADERS = {
'user-agent': ('Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) '
'AppleWebKit/537.36 (KHTML, like …Run Code Online (Sandbox Code Playgroud) 我试图异步发送一些多部分编码的表单数据作为发布请求,主要是一个文件和其他两个字段。
在尝试使用 asyncio 之前,我正在使用 requests-toolbelt MultipartEncoder( https://github.com/requests/toolbelt )同步执行该过程,这对于普通请求非常有效,但在使用 aiohttp 进行异步时不起作用。aiohttp 提供了 2 个多部分类,一个FormData()类和一个MultipartWriter()类,这两个类都没有给我带来很大的成功。
经过一些测试,似乎不同之处在于,当我使用工具带时MultipartEncoder(),请求会form在发布请求的部分按原样发送数据。但是,当使用 aiohttp 时,请求被放入请求的body部分。不知道为什么他们的行为不同
def multipartencode() -> ClientResponse():
# Using MultipartEncoder
m = MultipartEncoder(
fields={'type': type_str,
'metadata': json.dumps(metadata),
'file': (filename, file, 'application/json')}
)
# Using FormData
data = FormData()
data.add_field('file', file, filename=filename,
content_type='multipart/form-data')
data.add_field('type', type_str, content_type='multipart/form-data')
data.add_field('metadata', json.dumps(metadata),
content_type='multipart/form-data')
# Using MultipartWriter
with MultipartWriter('multipart/form-data') as mpwriter:
part = mpwriter.append(
file, {'CONTENT-TYPE': 'multipart/form-data'})
part.set_content_disposition('form-data')
part = mpwriter.append_form([('type', type_str)]) …Run Code Online (Sandbox Code Playgroud) python multipartform-data python-requests python-asyncio aiohttp
这个 Web 教程和这个 SO answer建议使用信号量来限制使用 aiohttp 发出的并发请求的数量。
我很困惑,因为aiohttp它自己提供了一个工具来限制并发连接的数量(limit和limit_per_host,记录在这里) - 那么使用信号量不是重新发明轮子吗?
也许不是。每个连接可以有多个并发请求吗?根据这篇维基百科文章和这个 SO answer,似乎可以。因此,根据我链接到限制并发连接的文档,设置limit和/或limit_per_hostin可能没有限制并发请求的效果。aiohttp
我仍然很困惑,因为如果是这种情况,这些参数aiohttp提供的用途是什么?为什么用户想要限制连接而不是请求?但是这样的推理当然不需要任何东西,所以我准备继续前进并使用信号量。
然后我偶然发现了这个问题。它有两个相对高度赞成的答案。这些答案之一再次建议使用信号量。但另一个答案建议使用这些aiohttp设施limit和limit_per_host. 如果这个答案是正确的,那么限制连接在aiohttp也限制了请求-所以没有信号量是必要的(除非还希望限制每秒请求的速率,这是不是我在这里抢断)
这就是我想在这个问题中提出的问题。是否限制并发连接,在aiohttp通过limit和/或limit_per_host也限制并发请求?我想答案取决于aiohttp每个请求是否只使用一个连接,我也不知道。
难道aiohttp只使用每个请求一个连接?限制并发连接是否也限制并发请求?
正如标题所说,我的用例是这样的:
我有一个 aiohttp 服务器,它接受来自客户端的请求,当我收到请求时,我为它生成一个唯一的请求 ID,然后我将{req_id: req_pyaload}dict发送给一些工作人员(工作人员不在 python 中,因此在另一个进程中运行),当工人完成的工作,我回来的响应,并把它们放到一个结果字典是这样的:{req_id_1: res_1, req_id_2: res_2}。
然后我希望我的 aiohttp 服务器处理程序await在上面result dict,所以当特定的响应变得可用时(通过 req_id)它可以将它发送回来。
下面的示例代码,我建立以尽量模拟过程中,却被困在执行协程async def fetch_correct_res(req_id)应异步/ unblockly获取由正确的响应req_id。
import random
import asyncio
import shortuuid
n_tests = 1000
idxs = list(range(n_tests))
req_ids = []
for _ in range(n_tests):
req_ids.append(shortuuid.uuid())
res_dict = {}
async def fetch_correct_res(req_id):
pass
async def handler(req):
res = await fetch_correct_res(req)
assert req == res, "the correct res for the req should exactly …Run Code Online (Sandbox Code Playgroud) 想象一下,我有一个基于 Aiohttp 的 Web 应用程序:
from aiohttp import web
import asyncio
import logging
logger = logging.getLogger(__name__)
async def hello(request):
logger.info('Started processing request')
await asyncio.sleep(1)
logger.info('Doing something')
await asyncio.sleep(1)
return web.Response(text="Hello, world!\n")
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(name)-14s %(levelname)s: %(message)s')
app = web.Application()
app.add_routes([web.get('/', hello)])
web.run_app(app)
Run Code Online (Sandbox Code Playgroud)
它的输出是(例如):
2019-11-11 13:37:14,757 __main__ INFO: Started processing request
2019-11-11 13:37:14,757 __main__ INFO: Started processing request
2019-11-11 13:37:15,761 __main__ INFO: Doing something
2019-11-11 13:37:15,761 __main__ INFO: Doing something
2019-11-11 13:37:16,765 aiohttp.access INFO: 127.0.0.1 [11/Nov/2019:12:37:14 +0000] "GET / HTTP/1.1" …Run Code Online (Sandbox Code Playgroud) 我正在使用 asyncio 和 aiohttp 来发出并发请求。我最近将 Python 升级到了 3.8.0 版,并且RuntimeError: Event loop is closed在程序运行后得到了一个。
import asyncio
import aiohttp
async def do_call(name, session):
async with session.get('https://www.google.be') as response:
await response.text()
return 'ok - {}'.format(name)
async def main():
async with aiohttp.ClientSession() as session:
tasks = [do_call(str(i), session) for i in range(0,4)]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
我确实从 asyncio.gather() 得到了一个有效的结果,但是在退出时会引发异常。我想更改代码,以免遇到异常。
回溯如下:
Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x000001E9A92079D0>
Traceback (most recent call last):
File "C:\Users\Jonas\AppData\Local\Programs\Python\Python38\lib\asyncio\proactor_events.py", line 116, in __del__
self.close()
File …Run Code Online (Sandbox Code Playgroud) 场景:我需要从 Web 应用程序的 API 收集分页数据,该 API 的调用限制为每分钟 100 次。我需要返回的 API 对象每页包含 100 个项目,总共有 105 个页面,而且还在不断增加(总共约 10,500 个项目)。同步代码需要大约 15 分钟来检索所有页面,因此那时不必担心达到调用限制。但是,我想加快数据检索速度,因此我使用asyncio和实现了异步调用aiohttp。数据现在在 15 秒内下载 - 很好。
问题:我现在达到了呼叫限制,因此在最近 5 次左右的呼叫中收到 403 错误。
建议的解决方案我实现了try/except在get_data()函数中找到的。我拨打电话,然后当由于403: Exceeded call limit我退后back_off几秒钟而未成功拨打电话并重试retries多次时:
async def get_data(session, url):
retries = 3
back_off = 60 # seconds to try again
for _ in range(retries):
try:
async with session.get(url, headers=headers) as response:
if response.status != 200:
response.raise_for_status() …Run Code Online (Sandbox Code Playgroud) 我在做什么
我正在通过构建一个 REST api 来学习 aiohttp,我正在使用 Pytest(及其 async 和 aiohttp 插件)进行测试。
对于我的第一次测试(我从一开始就使用 TDD)我有以下代码:
@pytest.mark.asyncio
async def test_handle_user_create(
aiohttp_client, init_test_app, create_test_user_table
):
payload = {
"email": "tintin@gmail.com",
"username": "Tintin",
"password": "y0u != n00b1e",
}
client = await aiohttp_client(init_test_app)
resp = await client.post("/users/", json=payload)
...
Run Code Online (Sandbox Code Playgroud)
aiohttp_client 是客户端装置来自 pytest-aiohttpinit_test_app 是一个固定装置,它基本上反映了我将要构建的应用程序create_test_user_table 是我在测试数据库中为用户创建表的夹具它有什么问题
我的第一个测试是在上面代码块的最后一行抛出以下运行时错误:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ …Run Code Online (Sandbox Code Playgroud) 不删除,因为它有 httpx 的示例代码。
我正在尝试利用asyncio并行化几个长时间运行的 Web 请求。因为我是从requests库中迁移过来的,所以我想使用这个httpx库,因为它有类似的 API。我的环境是 Python 3.7.7 Anaconda 发行版,其中安装了所有必需的软件包(Windows 10)。
然而,尽管能够httpx用于同步 Web 请求(或用于串行执行一个接一个运行的异步请求),但我无法成功地一次运行多个异步请求,尽管使用aiohttp库很容易做到这一点.
这是在aiohttp: (请注意,我在 Jupyter 中运行,所以我已经有一个事件循环,因此缺少asyncio.run().
import aiohttp
import asyncio
import time
import httpx
async def call_url(session):
url = "https://services.cancerimagingarchive.net/services/v3/TCIA/query/getCollectionValues"
response = await session.request(method='GET', url=url)
#response.raise_for_status()
return response
for i in range(1,5):
start = time.time() # start time for timing event
async with aiohttp.ClientSession() as session: #use aiohttp
#async with httpx.AsyncClient as …Run Code Online (Sandbox Code Playgroud) 我需要抓取并获取许多(每天 5-10k)新闻文章的正文段落的原始文本。我已经编写了一些线程代码,但考虑到这个项目的高度 I/O 限制性质,我正在涉足asyncio. 下面的代码片段并不比 1 线程版本快,而且比我的线程版本差得多。谁能告诉我我做错了什么?谢谢你!
async def fetch(session,url):
async with session.get(url) as response:
return await response.text()
async def scrape_urls(urls):
results = []
tasks = []
async with aiohttp.ClientSession() as session:
for url in urls:
html = await fetch(session,url)
soup = BeautifulSoup(html,'html.parser')
body = soup.find('div', attrs={'class':'entry-content'})
paras = [normalize('NFKD',para.get_text()) for para in body.find_all('p')]
results.append(paras)
return results
Run Code Online (Sandbox Code Playgroud) aiohttp ×10
python ×10
python-asyncio ×10
asynchronous ×3
async-await ×2
python-3.x ×2
logging ×1
rest ×1