下面是一个收集 URL 长度的简单程序。
import aiohttp
import asyncio
from time import perf_counter
URLS = ['http://www.cnn.com', 'http://www.huffpost.com', 'http://europe.wsj.com',
'http://www.bbc.co.uk', 'http://failfailfail.com']
async def async_load_url(url, session):
try:
async with session.get(url) as resp:
content = await resp.read()
print(f"{url!r} is {len(content)} bytes")
except IOError:
print(f"failed to load {url}")
async def main():
async with aiohttp.ClientSession() as session:
tasks = [async_load_url(url, session) for url in URLS]
await asyncio.wait(tasks)
if __name__ == "__main__":
start = perf_counter()
asyncio.run(main())
elapsed = perf_counter() - start
print(f"\nTook {elapsed} seconds")
Run Code Online (Sandbox Code Playgroud)
为什么以下代码在 python 3.9 中失败并出现运行时错误并忽略异常?如何修复它? …
我有一个使用 Aiohttp 的非常简单的文件服务器:
import os.path
from os import listdir
import asyncio
from aiohttp import web
import aiohttp_jinja2
import jinja2
@aiohttp_jinja2.template('template2.html')
@asyncio.coroutine
def get(request):
root = os.path.abspath(os.path.dirname(__file__))
files = [file_name for file_name in listdir(os.path.join(root,'files'))
if os.path.isfile(os.path.join(root, 'files', file_name))]
return {'files': files}
if __name__ == "__main__":
app = web.Application()
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader('/root/async'))
app.router.add_route('GET', '/', get)
app.router.add_static('/files/' , '/root/async/files')
loop = asyncio.get_event_loop()
f = loop.create_server(app.make_handler(), '0.0.0.0', 8080)
srv = loop.run_until_complete(f)
print(' serving on ', srv.sockets[0].getsockname())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
Run Code Online (Sandbox Code Playgroud)
当我使用 siege 工具测试它时,例如,这样的命令:
siege …Run Code Online (Sandbox Code Playgroud) 我正在编写应用程序,它每秒扫描目录,检查新文件,如果它们出现 - 通过POST请求发送它们并执行存档.假设可以出现在目录中的文件数量可以从10到100 - 我决定使用asyncio和aiohttp来同时发送请求.
码:
import os
import aiohttp
from aiohttp.client import ClientSession
BASE_DIR = '/path/to'
ARCHIVE_DIR = '/path/to/archive'
async def scan():
while True:
await asyncio.sleep(1)
for file in os.listdir(BASE_DIR):
if os.path.join(BASE_DIR, file).endswith('jpg'):
asyncio.ensure_future(publish_file(file))
async def publish_file(file):
async with ClientSession(loop=loop) as session:
async with session.post(url=url, data={'photo': open(os.path.join(BASE_DIR, file), 'rb')}) as response:
if response.status == 200:
await move_to_archive(file)
async def move_to_archive(file):
os.rename(os.path.join(BASE_DIR, file), os.path.join(ARCHIVE_DIR, file))
loop = asyncio.get_event_loop()
coros = [
asyncio.ensure_future(scan())
]
loop.run_until_complete(asyncio.wait(coros))
Run Code Online (Sandbox Code Playgroud)
所以,问题是:如果我要发送的请求的并发,这是一个很好的做法,协同程序添加到循环是这样的:asyncio.ensure_future(publish_file(file))?
我有一个抓取工具(基于 Python 3.4.2 和 asyncio/aiohttp 库)和一堆链接(> 10K)来检索一些少量数据。部分爬虫代码:
@asyncio.coroutine
def prepare(self, links):
semaphore = asyncio.Semaphore(self.limit_concurrent)
tasks = []
result = []
tasks = [self.request_data(link, semaphore) for link in links]
for task in asyncio.as_completed(tasks):
response = yield from task
if response:
result.append(response)
task.close()
return result
@asyncio.coroutine
def request_data(self, link, semaphore):
...
with (yield from semaphore):
while True:
counter += 1
if counter >= self.retry:
break
with aiohttp.Timeout(self.timeout):
try:
response = yield from self.session.get(url, headers=self.headers)
body = yield from response.read()
break
except asyncio.TimeoutError …Run Code Online (Sandbox Code Playgroud) 我将如何在同一个程序中组合两个 asyncio 库?
对于上下文,我希望使用Discord api与 aiohttp,它们都是异步事件循环驱动的。我可能也想在混合中添加一个异步 irc 库。
但是,我不明白它们将如何一起操作。我相信理论上我会实现该程序,以便所有类实例都使用相同的 asyncio 事件循环实例,然后将所有运行函数合并为一个,最终调用事件循环。
但是,我想知道是否有更优雅的方法来做到这一点?
我使用 aiohttp 处理 websockets 和 postgresql 监听/通知,使用 aiopg 驱动程序作为服务器之间的传输队列。
我的服务器设置的一部分:
def init(loop):
...
app = Application(loop=loop)
app.on_startup.append(connect_db)
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
return app
def create_app(loop):
loop = asyncio.get_event_loop()
app = init(loop)
return app
Run Code Online (Sandbox Code Playgroud)
这是我的信号:
dsn = 'dbname=test user=root password=test host=127.0.0.1'
async def connect_db(app):
app['db'] = await aiopg.connect(dsn)
return app
async def listen_events_from_db(app):
cursos = await app['db'].cursor()
await cursos.execute("LISTEN test")
try:
while True:
msg = await app['db'].notifies.get()
# Here will be a coroutine which will send msg to connected websockets
print('msg: ', msg.payload) …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 aiohttp/python 访问一些网站。目前我可以在 Google Chrome 中使用扩展程序导出网站的 cookie,格式如下
.domain.name TRUE / FALSE 1547016401 cookies 值
我可以加载它
import http.cookiejar
cj = http.cookiejar.MozillaCookieJar('cookies.txt')
cj.load()
cookies = {}
for each in cj:
cookies[each.name] = each.value
Run Code Online (Sandbox Code Playgroud)
并cookies在 aiohttp 中使用,例如:
async with ClientSession(cookies=cookies) as session:
Run Code Online (Sandbox Code Playgroud)
有没有更优雅的方法来做到这一点?浏览了 aiohttp 的文档,但没有找到它们。
我正在使用 asyncio 和 aiohttp 来制作异步抓取工具。出于某种原因,在我达到 150+ 请求后,它开始变慢。第一个异步在我获得链接的地方运行良好。第二个是我遇到缓慢发生的问题的地方。就像在 200 之后,一个请求需要 1 分钟。知道为什么吗?我是否错误地使用了 Asyncio 或 aiohttp?编辑:我在 7gb ram 上运行这个本地,所以我不认为我内存不足。
import aiohttp
import asyncio
import async_timeout
import re
from lxml import html
import timeit
from os import makedirs,chmod
basepath = ""
start = timeit.default_timer()
novel = ""
novel = re.sub(r"[^a-zA-Z0-9 ]+/", "", novel)
novel = re.sub(r" ", "-", novel)
novel_url = {}
@asyncio.coroutine
def get(*args, **kwargs):
response = yield from aiohttp.request('GET', *args, **kwargs)
return (yield from response.text())
def scrape_links(page):
url = html.fromstring(page)
links …Run Code Online (Sandbox Code Playgroud) 我还不完全理解 asyncio 和 aiohttp 是如何工作的。
我正在尝试从 URL 列表中发出一堆异步 api 请求,并将它们保存为变量,以便稍后处理它们。
到目前为止,我正在生成没有问题的列表并设置请求框架。
urls = []
for i in range(0,20):
urls.append('https://api.binance.com/api/v1/klines?symbol={}&interval=
{}&limit={}'.format(pairs_list_pairs[i],time_period,
pull_limit))
import asyncio
import aiohttp
async def request(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()
async def main():
results = await asyncio.gather(
request(urls[0]),
request(urls[1]),
)
print(len(results))
print(results)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.close()
Run Code Online (Sandbox Code Playgroud)
如果我使用索引手动一一输入我的请求(如下所示),我就可以发出请求。但问题是我的列表中有超过 100 个 api 请求,我不想手动输入这些请求。如何迭代我的列表?另外,如何将结果保存到变量中?当脚本结束时,它不会在任何地方保存“结果”。
async def main():
results = await asyncio.gather(
request(urls[0]),
request(urls[1]),
)
print(len(results))
print(results)
Run Code Online (Sandbox Code Playgroud)
下面是一些复制代码的示例 URL: …
aiohttp ×10
python ×8
python-3.x ×2
asynchronous ×1
coroutine ×1
event-loop ×1
performance ×1
postgresql ×1
python-3.5 ×1
request ×1
sockets ×1