Python asyncio(aiohttp,aiofiles)

Dre*_*rew 2 python python-asyncio

我似乎很难理解蟒蛇asyncio.我没有编写任何代码,因为我看到的所有示例都是针对一次性运行的.创建一些协同程序,将它们添加到事件循环中,然后运行循环,它们运行在它们之间切换的任务,完成.这似乎对我没什么帮助.

我想使用asyncio不中断我的应用程序中的操作(使用pyqt5).我想创建一些函数,当在asyncio事件循环中调用run时,然后当它们完成时,它们会进行回调.

我的想象是.为asyncio创建一个单独的线程,创建循环并永远运行它.创建一些功能getFile(url, fp),get(url),readFile(file)等于是在UI,我有一个提交按钮的文本框,用户输入网址,点击提交,它下载文件.

但是,我看到的每个例子,我都看不到如何在正在运行的循环中添加协同程序.而且我没有看到如何在不增加运行循环的情况下做我想做的事情.

#!/bin/python3
import asyncio
import aiohttp
import threading

loop = asyncio.get_event_loop()

def async_in_thread(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def _get(url, callback):
    print("get: " + url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            result = await response.text()
            callback(result)
            return

def get(url, callback):
    asyncio.ensure_future(_get(url, callback))

thread = threading.Thread(target=async_in_thread, args=(loop, ))

thread.start()

def stop():
    loop.close()

def callme(data):
    print(data)
    stop()

get("http://google.com", callme)

thread.join()
Run Code Online (Sandbox Code Playgroud)

这是我的想象,但它不起作用.

use*_*342 5

要将coroutine添加到在不同线程中运行的循环,请使用asyncio.run_coroutine_threadsafe:

def get(url, callback):
    asyncio.run_coroutine_threadsafe(_get(url, callback))
Run Code Online (Sandbox Code Playgroud)

通常,当您从运行它的线程外部与事件循环交互时,您必须通过run_coroutine_threadsafe(对于协同程序)或loop.call_soon_threadsafe(对于函数)运行所有内容.例如,要停止循环,请使用loop.call_soon_threadsafe(loop.stop).还要注意,loop.close()不能在循环回调中调用,所以你应该在调用async_in_thread之后立即调用run_forever(),此时循环肯定停止运行.

asyncio的另一个问题是传递显式when_done回调并不是惯用的,因为asyncio暴露了期货的概念(类似于JavaScript承诺),它允许将回调附加到尚未可用的结果.例如,可以_get像这样写:

async def _get(url):
    print("get: " + url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()
Run Code Online (Sandbox Code Playgroud)

它不需要callback参数,因为任何感兴趣的一方都可以将其转换为任务使用,loop.create_taskadd_done_callback在任务完成时使用通知.例如:

def _get_with_callback(url, callback):
    loop = asyncio.get_event_loop()
    task = loop.create_task(_get(url))
    task.add_done_callback(lambda _fut: callback(task.result()))
Run Code Online (Sandbox Code Playgroud)

在您的情况下,您不直接处理任务,因为您的代码旨在与另一个线程的事件循环进行通信.但是,run_coroutine_threadsafe返回一个非常有用的值 - 一个完整的concurrent.futures.Future,可以用来注册完成的回调.callback您可以将未来对象公开给调用者,而不是接受参数:

def get(url):
    return asyncio.run_coroutine_threadsafe(_get(url), loop)
Run Code Online (Sandbox Code Playgroud)

现在调用者可以选择基于回调的方法:

future = get(url)
# call me when done
future.add_done_callback(some_callback)
# ... proceed with other work ...
Run Code Online (Sandbox Code Playgroud)

或者,在适当的时候,他们甚至可以等待结果:

# give me the response, I'll wait for it
result = get(url).result()
Run Code Online (Sandbox Code Playgroud)

后者根据定义阻塞,但由于事件循环在另一个线程中安全运行,因此它不受阻塞调用的影响.