如何更改此代码以使用上下文管理器?

Goo*_*ies 7 python python-3.x python-asyncio python-3.5 python-3.6

我正在尝试使用aiohttp和使用多个凭据同时登录网站asyncio.在该create_tasks函数中,我生成了一个用于每个会话的会话列表.我不能在login函数中创建一个sesssion的原因是因为在整个代码中将使用相同的会话对象.我正在尝试做的是设计一种方法,我可以使用上下文管理器来处理会话的关闭(以避免运行时错误使其保持打开状态).

以下代码按预期工作(并发收集登录页面并在进程池中解析令牌),但它会与任务分开生成会话,并要求我在最后关闭它们.

from bs4 import BeautifulSoup
from concurrent.futures import ProcessPoolExecutor
import aiohttp
import asyncio

#TODO: make this safe, handle exceptions

LOGIN_URL = "http://example.com/login"
CLIENT_CNT = 10
proc_pool = ProcessPoolExecutor(CLIENT_CNT)

def get_key(text):
    soup = BeautifulSoup(text, "html.parser")
    form = soup.find("form")
    key = form.find("input", attrs={"type": "hidden", "name": "authenticityToken"})
    return key.get("value", None)

async def login(username:str, password:str, session:aiohttp.ClientSession, sem:asyncio.BoundedSemaphore, loop:asyncio.AbstractEventLoop=None):
    loop = loop or asyncio.get_event_loop()
    async with sem:
        async with session.get(LOGIN_URL) as resp:
            x = await asyncio.ensure_future(loop.run_in_executor(proc_pool, get_key, await resp.text()))
            print(x)

def create_tasks(usernames, passwords, sem:asyncio.BoundedSemaphore, loop:asyncio.AbstractEventLoop=None):
    loop = loop or asyncio.get_event_loop()
    tasks = []
    sessions = []
    for u, p in zip(usernames, passwords):
        session = aiohttp.ClientSession(loop=loop)
        sessions.append(session)
        tasks.append(login(u, p, session, sem, loop))
    return tasks, sessions

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    sem = asyncio.BoundedSemaphore(CLIENT_CNT)
    usernames = ("a", "b", "c", "d", "e", "f", "g")
    passwords = ("a", "b", "c", "d", "e", "f", "g")
    tasks, sessions = create_tasks(usernames, passwords, sem, loop)
    loop.run_until_complete(asyncio.gather(*tasks, loop=loop))
    for session in sessions:
        session.close()
Run Code Online (Sandbox Code Playgroud)

我以前做create_tasks了一个协程,写了一个包装类来制作异步迭代,并尝试使用

async with aiohttp.ClientSession() as session:
    tasks.append(login(u, p, session, sem, loop)
Run Code Online (Sandbox Code Playgroud)

但正如我所担心的,它表示会议在运行时已经关闭.

etl*_*lsh 0

你没有真正解释你需要什么样的任务,简单的获取?

有更复杂的事情吗?

您希望每个用户名/密码都是特定的吗?

最后需要保存所有回复吗?

对于这段代码,我假设用户名/密码并不重要,但它可以很快改变。

我没有使用单独启动会话的方式,而是使用消费者/生产者模式。

每个消费者都有一个与上下文管理器的会话,也不需要信号量(因为队列)。

import asyncio
from concurrent.futures import ProcessPoolExecutor

from aiohttp import ClientSession
from bs4 import BeautifulSoup

LOGIN_URL = "http://example.com/login"
CLIENT_CNT = 10
proc_pool = ProcessPoolExecutor(CLIENT_CNT)


def get_key(text):
    soup = BeautifulSoup(text, "html.parser")
    form = soup.find("form")
    key = form.find("input", attrs={"type": "hidden", "name": "authenticityToken"})
    return key.get("value", None)


async def init_consumer(username: str, password: str, loop, queue):
    loop = loop or asyncio.get_event_loop()
    async with ClientSession(loop=loop) as session:
        # init the session with creds? i you didn't use the username/password
        async with session.get(LOGIN_URL) as login_resp:
            x = await asyncio.ensure_future(loop.run_in_executor(proc_pool, get_key, await login_resp.text()))
            print(x)
        url = await queue.get()
        while url is not None:
            # Do things with session and queue
            async with session.get(url) as resp:
                rsp_as_txt = await resp.text()
            queue.task_done()
            url = await queue.get()


async def generate_tasks(queue):
    tasks = ["http://www.example.com" for i in range(20)]
    # putting all tasks in queue
    for task in tasks:
        await queue.put(task)
    # waiting for all tasks to finish
    queue.join()
    # Telling consumer to finish process
    for i in range(queue.maxsize):
        queue.put(None)


async def run(loop):
    queue = asyncio.Queue(CLIENT_CNT)
    usernames = ("a", "b", "c", "d", "e", "f", "g")
    passwords = ("a", "b", "c", "d", "e", "f", "g")
    consumers = [asyncio.ensure_future(init_consumer(u, p, loop, queue)) for u, p in zip(usernames, passwords)]
    return await generate_tasks(queue)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop=loop))
Run Code Online (Sandbox Code Playgroud)