Python 3.5异步/等待实际代码示例

n1_*_*n1_ 26 python asynchronous async-await python-asyncio python-3.5

我已经阅读了大量有关Python的3.5异步/等待事物的文章和教程.我不得不说我很困惑,因为有些人使用get_event_loop()和run_until_complete(),有些人使用ensure_future(),有些人使用asyncio.wait(),有些人使用call_soon().

看起来我有很多选择,但我不知道它们是完全相同还是有使用循环的情况,有些情况下你使用wait().

但事情是所有的例子都与asyncio.sleep()真实的慢速操作的模拟一起工作,它返回一个等待的对象.一旦我尝试将这一行换成一些真正的代码,整个事情就会失败.上面介绍的方法与我应该如何运行尚未准备好进行异步/等待的第三方库之间存在差异.我确实使用Quandl服务来获取一些股票数据.

 import asyncio
 import quandl

 async def slow_operation(n):
     # await asyncio.sleep(1) # Works because it's await ready.
     await quandl.Dataset(n) # Doesn't work because it's not await ready.


 async def main():
     await asyncio.wait([
         slow_operation("SIX/US9884981013EUR4"),
         slow_operation("SIX/US88160R1014EUR4"),
     ])

 # You don't have to use any code for 50 requests/day.
 quandl.ApiConfig.api_key = "MY_SECRET_CODE"

 loop = asyncio.get_event_loop()
 loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

我希望你明白我的感受是多么的失落,以及我希望并行运行的简单方法.

fre*_*ish 34

如果第三方库与之不兼容,async/await显然您无法轻松使用它.有两种情况:

  1. 假设库中的函数是异步的,它会给你一个回调,例如

    def fn(..., clb):
        ...
    
    Run Code Online (Sandbox Code Playgroud)

    所以你可以这样做:

    def on_result(...):
        ...
    
    fn(..., on_result)
    
    Run Code Online (Sandbox Code Playgroud)

    在这种情况下,您可以将这些函数包装到asyncio协议中,如下所示:

    from asyncio import Future
    
    def wrapper(...):
        future = Future()
        def my_clb(...):
            future.set_result(xyz)
        fn(..., my_clb)
        return future
    
    Run Code Online (Sandbox Code Playgroud)

    (future.set_exception(exc)在例外情况下使用)

    然后你可以在一些async函数中调用该包装器await:

    value = await wrapper(...)
    
    Run Code Online (Sandbox Code Playgroud)

    请注意,await适用于任何Future对象.您不必声明wrapperasync.

  2. 如果库中的函数是同步的,那么你可以在一个单独的线程中运行它(可能你会使用一些线程池).整个代码可能如下所示:

    import asyncio
    import time
    from concurrent.futures import ThreadPoolExecutor
    
    # Initialize 10 threads
    THREAD_POOL = ThreadPoolExecutor(10)
    
    def synchronous_handler(param1, ...):
        # Do something synchronous
        time.sleep(2)
        return "foo"
    
    # Somewhere else
    async def main():
        loop = asyncio.get_event_loop()
        futures = [
            loop.run_in_executor(THREAD_POOL, synchronous_handler, param1, ...),
            loop.run_in_executor(THREAD_POOL, synchronous_handler, param1, ...),
            loop.run_in_executor(THREAD_POOL, synchronous_handler, param1, ...),
        ]
        await asyncio.wait(futures)
        for future in futures:
            print(future.result())
    
    with THREAD_POOL:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    
    Run Code Online (Sandbox Code Playgroud)

如果由于某种原因无法使用线程,那么使用这样的库只会使整个异步代码毫无意义.

但请注意,使用带异步的同步库可能是一个坏主意.你不会得到太多,但你会使代码复杂化很多.