标签: aiohttp

Asyncio和aiohttp将所有urls路径路由到处理程序

我很难找到匹配所有传入网址的通配符网址匹配模式.这只匹配一个只有主机名的url:

import asyncio
from aiohttp import web

@asyncio.coroutine
def handle(request):
    print('there was a request')
    text = "Hello "
    return web.Response(body=text.encode('utf-8'))

@asyncio.coroutine
def init(loop):
    app = web.Application(loop=loop)
    app.router.add_route('GET', '/', handle)

    srv = yield from loop.create_server(app.make_handler(),
                                        '127.0.0.1', 9999)
    print("Server started at http://'127.0.0.1:9999'")
    return srv

loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass 
Run Code Online (Sandbox Code Playgroud)

因此,无论路径如何,它都应该在有请求时随时调用处理程序.如果是http://127.0.0.1:9999/http://127.0.0.1:9999/test/this/test/

我在这里了一下http://aiohttp.readthedocs.org/en/stable/web.html#aiohttp-web-variable-handler没有成功找到合适的线索

python python-3.x python-asyncio aiohttp

17
推荐指数
1
解决办法
5318
查看次数

Asyncio + aiohttp - redis Pub/Sub和websocket在单个处理程序中读/写

我正在玩aiohttp,看看它将如何作为带有websocket连接的移动应用程序的服务器应用程序.

这是一个简单的"Hello world"示例(这里是gist):

import asyncio
import aiohttp
from aiohttp import web


class WebsocketEchoHandler:

    @asyncio.coroutine
    def __call__(self, request):
        ws = web.WebSocketResponse()
        ws.start(request)

        print('Connection opened')
        try:
            while True:
                msg = yield from ws.receive()
                ws.send_str(msg.data + '/answer')
        except:
            pass
        finally:
            print('Connection closed')
        return ws


if __name__ == "__main__":
    app = aiohttp.web.Application()
    app.router.add_route('GET', '/ws', WebsocketEchoHandler())

    loop = asyncio.get_event_loop()
    handler = app.make_handler()

    f = loop.create_server(
        handler,
        '127.0.0.1',
        8080,
    )

    srv = loop.run_until_complete(f)
    print("Server started at {sock[0]}:{sock[1]}".format(
        sock=srv.sockets[0].getsockname()
    ))
    try:
        loop.run_forever()
    except …
Run Code Online (Sandbox Code Playgroud)

python redis python-asyncio aiohttp

16
推荐指数
1
解决办法
7004
查看次数

aiohttp.TCPConnector(带限制参数)vs asyncio.Semaphore用于限制并发连接数

我想我想通过制作一个允许你在一个下载多个资源的简单脚本来学习新的python异步等待语法,更具体地说是asyncio模块.

但是现在我被卡住了.

在研究时,我遇到了两个限制并发请求数量的选项:

  1. 将aiohttp.TCPConnector(带限制参数)传递给aiohttp.ClientSession或
  2. 使用asyncio.Semaphore.

是否有首选选项,或者如果您只想限制并发连接数,它们是否可以互换使用?性能方面(大致)是否相等?

两者似乎都有默认值100并发连接/操作.如果我只使用信号量限制为500,那么aiohttp内部会隐式地将我锁定为100个并发连接吗?

这对我来说都是非常新的和不清楚的.请随时指出我的任何误解或我的代码中的缺陷.

这是我的代码目前包含两个选项(我应该删除哪些?):

奖金问题:

  1. 如何处理(最好重试x次)出现错误的coros?
  2. coro完成后,保存返回数据(通知我的DataHandler)的最佳方法是什么?我不希望最后全部保存,因为我可以尽快开始处理结果.

小号

import asyncio
from tqdm import tqdm
import uvloop as uvloop
from aiohttp import ClientSession, TCPConnector, BasicAuth

# You can ignore this class
class DummyDataHandler(DataHandler):
    """Takes data and stores it somewhere"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def take(self, origin_url, data):
        return True

    def done(self):
        return None

class AsyncDownloader(object):
    def __init__(self, concurrent_connections=100, silent=False, data_handler=None, loop_policy=None):

        self.concurrent_connections = concurrent_connections
        self.silent = silent

        self.data_handler = data_handler or DummyDataHandler()

        self.sending_bar = …
Run Code Online (Sandbox Code Playgroud)

python async-await python-asyncio aiohttp python-3.5

16
推荐指数
1
解决办法
823
查看次数

Python aiohttp/asyncio - 如何处理返回的数据

我正在使用aiohttp将一些同步代码移动到asyncio.同步代码需要15分钟才能运行,所以我希望能够改进这一点.

我有一些工作代码从一些网址获取数据并返回每个网址的主体.但这只是针对1个实验室网站,我有70多个实际网站.

因此,如果我有一个循环来创建所有网站的所有网址列表,这些网址将在列表中处理700个网址.现在处理它们我不认为是一个问题?

但对结果做'东西',我不知道如何编程?我已经有了代码,它会对返回的每个结果做"填充",但我不确定如何针对正确的结果类型进行编程.

代码运行时是否会处理所有网址并根据运行时间返回未知顺序?

我是否需要一个能处理任何类型结果的函数?

import asyncio, aiohttp, ssl
from bs4 import BeautifulSoup

def page_content(page):
    return BeautifulSoup(page, 'html.parser')


async def fetch(session, url):
    with aiohttp.Timeout(15, loop=session.loop):
        async with session.get(url) as response:
            return page_content(await response.text())

async def get_url_data(urls, username, password):
    tasks = []
    # Fetch all responses within one Client session,
    # keep connection alive for all requests.
    async with aiohttp.ClientSession(auth=aiohttp.BasicAuth(username, password)) as session:
        for i in urls:
            task = asyncio.ensure_future(fetch(session, i))
            tasks.append(task)

        responses = await asyncio.gather(*tasks)
        # you now have …
Run Code Online (Sandbox Code Playgroud)

python python-asyncio aiohttp

15
推荐指数
2
解决办法
2121
查看次数

Asyncio RuntimeError:事件循环已关闭

我正在尝试使用Asyncio和aiohttp库发出一堆请求(~1000),但我遇到了一个我找不到太多信息的问题.

当我用10个网址运行这个代码时,它运行得很好.当我用100多个网址运行它时,它会中断并给我RuntimeError: Event loop is closed错误.

import asyncio
import aiohttp


@asyncio.coroutine
def get_status(url):
    code = '000'
    try:
        res = yield from asyncio.wait_for(aiohttp.request('GET', url), 4)
        code = res.status
        res.close()
    except Exception as e:
        print(e)
    print(code)


if __name__ == "__main__":
    urls = ['https://google.com/'] * 100
    coros = [asyncio.Task(get_status(url)) for url in urls]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(coros))
    loop.close()
Run Code Online (Sandbox Code Playgroud)

堆栈跟踪可以在这里找到.

任何帮助或洞察都会非常感激,因为我现在已经敲了几个小时.显然,这表明事件循环已经关闭,应该仍然是开放的,但我不知道这是怎么可能的.

python python-3.x python-asyncio aiohttp

14
推荐指数
2
解决办法
1万
查看次数

如何根据状态码重试异步 aiohttp 请求

我正在使用 api,有时它会给出一些奇怪的状态代码,只需重试相同的请求就可以修复这些代码。我正在使用 aiohttp 异步地向这个 api 发出请求

我也在使用退避库来重试请求,但是似乎仍然没有重试 401 请求。

   @backoff.on_exception(backoff.expo, aiohttp.ClientError, max_tries=11, max_time=60)
    async def get_user_timeline(self, session, user_id, count, max_id, trim_user, include_rts, tweet_mode):

        params = {
            'user_id': user_id,
            'trim_user': trim_user,
            'include_rts': include_rts,
            'tweet_mode': tweet_mode,
            'count': count
        }


        if (max_id and max_id != -1):
            params.update({'max_id': max_id})

        headers = {
            'Authorization': 'Bearer {}'.format(self.access_token)    
        }

        users_lookup_url = "/1.1/statuses/user_timeline.json"

        url = self.base_url + users_lookup_url

        async with session.get(url, params=params, headers=headers) as response:
            result = await response.json()
            response = {
                'result': result,
                'status': response.status,
                'headers': response.headers …
Run Code Online (Sandbox Code Playgroud)

python python-3.x python-asyncio aiohttp

14
推荐指数
2
解决办法
1万
查看次数

如何模拟 aiohttp.ClientSession 做出的响应?

我正在使用 aiohttp 发出异步请求,我想测试我的代码。我想模拟 aiohttp.ClientSession 发送的请求。我正在寻找类似于响应处理requestslib模拟的方式。

我如何模拟aiohttp.ClientSession?

# sample method
async def get_resource(self, session):
    async with aiohttp.ClientSession() as session:
        response = await self.session.get("some-external-api.com/resource")
        if response.status == 200:
            result = await response.json()
            return result

        return {...}

# I want to do something like ...
aiohttp_responses.add(
    method='GET', 
    url="some-external-api.com/resource", 
    status=200, 
    json={"message": "this worked"}
)

async def test_get_resource(self):
    result = await get_resource()
    assert result == {"message": "this worked"}
Run Code Online (Sandbox Code Playgroud)
  • 我已经通读了aiohttp 测试文档。似乎他们涵盖了模拟对您的 Web 服务器的传入请求,但我不确定这是否有助于我模拟对传出请求的响应

编辑

我在几个项目中使用了https://github.com/pnuckowski/aioresponses,它非常适合我的需求。

python-asyncio aiohttp pytest-aiohttp

14
推荐指数
2
解决办法
4794
查看次数

使用tqdm的asyncio aiohttp进度条

我正在尝试集成一个tqdm进度条来监视aiohttp在Python 3.5中生成的POST请求.我有一个工作进度条,但似乎无法使用收集结果as_completed().指针感激不尽.

我发现的示例建议使用以下模式,该模式与Python 3.5 async def定义不兼容:

for f in tqdm.tqdm(asyncio.as_completed(tasks), total=len(coros)):
    yield from f
Run Code Online (Sandbox Code Playgroud)

在没有进度条的情况下工作(虽然部分编辑)异步代码:

def async_classify(records):

    async def fetch(session, name, sequence):
        url = 'https://app.example.com/api/v0/search'
        payload = {'sequence': str(sequence)}
        async with session.post(url, data=payload) as response:
            return name, await response.json()

    async def loop():
        auth = aiohttp.BasicAuth(api_key)
        conn = aiohttp.TCPConnector(limit=100)
        with aiohttp.ClientSession(auth=auth, connector=conn) as session:
            tasks = [fetch(session, record.id, record.seq) for record in records]
            responses = await asyncio.gather(*tasks)    
        return OrderedDict(responses)
Run Code Online (Sandbox Code Playgroud)

这是我修改的失败尝试loop():

async def …
Run Code Online (Sandbox Code Playgroud)

progress-bar python-asyncio aiohttp python-3.5 tqdm

13
推荐指数
1
解决办法
4775
查看次数

多个aiohttp Application()在同一个进程中运行?

两个aiohttp.web.Application()对象可以在同一个进程中运行,例如在不同的端口上运行吗?

我看到一堆aiohttp代码示例:

from aiohttp import web
app = web.Application()
app.router.add_get('/foo', foo_view, name='foo')
web.run_app(app, host='0.0.0.0', port=10000)
Run Code Online (Sandbox Code Playgroud)

我想知道是否有一些等价物web.Applications()可以配置多个同时运行.就像是:

from aiohttp import web
app1 = web.Application()
app1.router.add_get('/foo', foo_view, name='foo')
app2 = web.Application()
app2.router.add_get('/bar', bar_view, name='bar')
# This is the wishful thinking code:
web.configure_app(app1, host='0.0.0.0', port=10000)
web.configure_app(app2, host='0.0.0.0', port=10001)
web.run_apps()
Run Code Online (Sandbox Code Playgroud)

我的用例是我有一个现有的python web框架来做这种事情,我正在构建一个类似于python 3.6和aiohttp的原型.

我知道多个python服务器可以运行在例如nginx之后(另请参阅http://aiohttp.readthedocs.io/en/stable/deployment.html); 那不是我追求的.我想探索两个具有相同asyncio事件循环的aiohttp Web服务器的可能性,在同一个python进程中运行,在两个不同的端口上提供服务.

python python-asyncio aiohttp

13
推荐指数
2
解决办法
2652
查看次数

如何重用aiohttp ClientSession池?

文档说要重用ClientSession:

不要为每个请求创建会话.很可能每个应用程序都需要一个会话来完成所有请求.

会话内部包含连接池,连接重用和保持活动(两者都默认打开)可以加快总体性能.1

但是在文档中似乎没有任何解释如何做到这一点?有一个例子可能是相关的,但它没有说明如何在其他地方重用池:http://aiohttp.readthedocs.io/en/stable/client.html#keep-alive-connection-pooling-and-cookie-分享

这样的事情是正确的做法吗?

@app.listener('before_server_start')
async def before_server_start(app, loop):
    app.pg_pool = await asyncpg.create_pool(**DB_CONFIG, loop=loop, max_size=100)
    app.http_session_pool = aiohttp.ClientSession()


@app.listener('after_server_stop')
async def after_server_stop(app, loop):
    app.http_session_pool.close()
    app.pg_pool.close()


@app.post("/api/register")
async def register(request):
    # json validation
    async with app.pg_pool.acquire() as pg:
        await pg.execute()  # create unactivated user in db
        async with app.http_session_pool as session:
            # TODO send activation email using SES API
            async with session.post('http://httpbin.org/post', data=b'data') as resp:
                print(resp.status)
                print(await resp.text())
        return HTTPResponse(status=204)
Run Code Online (Sandbox Code Playgroud)

python-asyncio aiohttp sanic

13
推荐指数
1
解决办法
3001
查看次数