bal*_*lki 19 python python-3.x async-await python-asyncio python-3.5
我有很少的阻止功能foo,bar我无法改变那些(一些我无法控制的内部库.与一个或多个网络服务交谈).我如何将其用作异步?我不想做以下事情.
results = []
for inp in inps:
    val = foo(inp)
    result = bar(val)
    results.append(result)
这将是低效的,因为我foo在等待第一个输入时可以调用第二个输入,并且相同bar.如何包装它们,这样它们与ASYNCIO(即新的使用async,await语法)?
让我们假设这些函数是可重入的.即,foo当先前foo正在处理时再次调用是可以的.
更新
用可重复使用的装饰器扩展答案.点击这里举例.
def run_in_executor(f):
    @functools.wraps(f)
    def inner(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return loop.run_in_executor(None, functools.partial(f, *args, **kwargs))
    return inner
Nic*_*ger 18
这里有两个问题:第一,如何异步运行阻塞代码,第二,如何并行运行异步代码(asyncio是单线程的,所以GIL仍然适用,所以它不是真正的并发,但是我离题了.
可以使用asyncio.ensure_future创建并行任务,如此处所述.
要运行同步代码,您需要在执行程序中运行阻塞代码.例:
import concurrent.futures
import asyncio
import time
def blocking(delay):
    time.sleep(delay)
    print('Completed.')
async def non_blocking(loop, executor):
    # Run three of the blocking tasks concurrently. asyncio.wait will
    # automatically wrap these in Tasks. If you want explicit access
    # to the tasks themselves, use asyncio.ensure_future, or add a
    # "done, pending = asyncio.wait..." assignment
    await asyncio.wait(
        fs={
            # Returns after delay=12 seconds
            loop.run_in_executor(executor, blocking, 12),
            # Returns after delay=14 seconds
            loop.run_in_executor(executor, blocking, 14),
            # Returns after delay=16 seconds
            loop.run_in_executor(executor, blocking, 16)
        },
        return_when=asyncio.ALL_COMPLETED
    )
loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
loop.run_until_complete(non_blocking(loop, executor))
如果你想使用for循环安排这些任务(如你的例子),你有几种不同的策略,但底层的方法是使用for循环(或列表推导等)安排任务,等待它们与asyncio.等待,然后检索结果.例:
done, pending = await asyncio.wait(
    fs=[loop.run_in_executor(executor, blocking_foo, *args) for args in inps],
    return_when=asyncio.ALL_COMPLETED
)
# Note that any errors raise during the above will be raised here; to
# handle errors you will need to call task.exception() and check if it
# is not None before calling task.result()
results = [task.result() for task in done]
use*_*939 12
现在您还可以asyncio.to_thread在单独的线程中异步运行函数。
import asyncio
import time
def blocking():
    print("Started!")
    time.sleep(5)
    print("Finished!")
async def non_blocking():
    print("Started!")
    await asyncio.sleep(5)
    print("Finished!")
async def main():
    print("Blocking function:")
    start_time = time.time()
    await asyncio.gather(*(asyncio.to_thread(blocking) for i in range(5)))
    print("--- %s seconds ---" % (time.time() - start_time))
    print("Non-blocking function:")
    start_time = time.time()
    await asyncio.gather(*(non_blocking() for i in range(5)))
    print("--- %s seconds ---" % (time.time() - start_time))
asyncio.run(main())
输出:
Blocking function:
Started!
Started!
Started!
Started!
Started!
Finished!
Finished!
Finished!
Finished!
Finished!
--- 5.018239736557007 seconds ---
Non-blocking function:
Started!
Started!
Started!
Started!
Started!
Finished!
Finished!
Finished!
Finished!
Finished!
--- 5.005802392959595 seconds ---
文档:https://docs.python.org/3.11/library/asyncio-task.html#asyncio.to_thread
扩展接受的答案以实际解决问题。
注意:需要python 3.7+
import functools
from urllib.request import urlopen
import asyncio
def legacy_blocking_function():  # You cannot change this function
    r = urlopen("https://example.com")
    return r.read().decode()
def run_in_executor(f):
    @functools.wraps(f)
    def inner(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return loop.run_in_executor(None, lambda: f(*args, **kwargs))
    return inner
@run_in_executor
def foo(arg):  # Your wrapper for async use
    resp = legacy_blocking_function()
    return f"{arg}{len(resp)}"
@run_in_executor
def bar(arg):  # Another wrapper
    resp = legacy_blocking_function()
    return f"{len(resp)}{arg}"
async def process_input(inp):  # Modern async function (coroutine)
    res = await foo(inp)
    res = f"XXX{res}XXX"
    return await bar(res)
async def main():
    inputs = ["one", "two", "three"]
    input_tasks = [asyncio.create_task(process_input(inp)) for inp in inputs]
    print([await t for t in asyncio.as_completed(input_tasks)])
    # This doesn't work as expected :(
    # print([await t for t in asyncio.as_completed([process_input(inp) for inp in input_tasks])])
if __name__ == '__main__':
asyncio.run(main())
单击此处以获取该示例的最新版本并发送拉取请求。
import asyncio
from time import sleep
import logging
logging.basicConfig(
    level=logging.DEBUG, format="%(asctime)s %(thread)s %(funcName)s %(message)s")
def long_task(t):
    """Simulate long IO bound task."""
    logging.info("2. t: %s", t)
    sleep(t)
    logging.info("4. t: %s", t)
    return t ** 2
async def main():
    loop = asyncio.get_running_loop()
    inputs = range(1, 5)
    logging.info("1.")
    futures = [loop.run_in_executor(None, long_task, i) for i in inputs]
    logging.info("3.")
    results = await asyncio.gather(*futures)
    logging.info("5.")
    for (i, result) in zip(inputs, results):
        logging.info("6. Result: %s, %s", i, result)
if __name__ == "__main__":
    asyncio.run(main())
输出:
2020-03-18 17:13:07,523 23964 main 1.
2020-03-18 17:13:07,524 5008 long_task 2. t: 1
2020-03-18 17:13:07,525 21232 long_task 2. t: 2
2020-03-18 17:13:07,525 22048 long_task 2. t: 3
2020-03-18 17:13:07,526 25588 long_task 2. t: 4
2020-03-18 17:13:07,526 23964 main 3.
2020-03-18 17:13:08,526 5008 long_task 4. t: 1
2020-03-18 17:13:09,526 21232 long_task 4. t: 2
2020-03-18 17:13:10,527 22048 long_task 4. t: 3
2020-03-18 17:13:11,527 25588 long_task 4. t: 4
2020-03-18 17:13:11,527 23964 main 5.
2020-03-18 17:13:11,528 23964 main 6. Result: 1, 1
2020-03-18 17:13:11,528 23964 main 6. Result: 2, 4
2020-03-18 17:13:11,529 23964 main 6. Result: 3, 9
2020-03-18 17:13:11,529 23964 main 6. Result: 4, 16