如何将第三方库中的函数转换为异步函数?

Sta*_*tec 7 python asynchronous python-asyncio

我正在使用我的 Raspberry Pi 以及pigpiowebsockets库。

我希望我的程序异步运行(即我将用作async def main入口点)。

pigpio库期望调用一个同步回调函数来响应事件,这很好,但在该回调中我想从库中调用另一个异步函数websocket

所以它看起来像:

def sync_cb(): # <- This can not be made async, therefore I can not use await
   [ws.send('test') for ws in connected_ws] # <- This is async and has to be awaited
Run Code Online (Sandbox Code Playgroud)

目前我可以让它工作:

def sync_cb():
    asyncio.run(asyncio.wait([ws.send('test') for ws in connected_ws]))
Run Code Online (Sandbox Code Playgroud)

文档asyncio.run说不鼓励这种使用。

因此,我的同步回调需要调用ws.send(也来自第三方库),该回调与同步函数是异步的。

另一个有效的选择是:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(asyncio.gather(*[ws.send(json.dumps(message)) for ws in connected_ws]))
Run Code Online (Sandbox Code Playgroud)

但是创建和设置偶数循环的三行听起来对于运行一个简单的异步函数来说很多。

我的问题是:

  • cb是否可以在需要同步回调的地方替换异步函数(即在此示例中是否有办法实现异步)
  • asyncio.run而且,我通过使用并asyncio.wait调用一个简单的异步方法(在列表理解中)会产生什么样的开销

Art*_*nov 0

是否可以在需要同步回调的地方替换异步函数

有可能的。您可以在单独的线程中运行事件循环并async在那里发出代码,但您必须考虑 GIL。

import asyncio
import threading

class Portal:

    def __init__(self, stop_event):
        self.loop = asyncio.get_event_loop()
        self.stop_event = stop_event

    async def _call(self, fn, args, kwargs):
        return await fn(*args, **kwargs)

    async def _stop(self):
        self.stop_event.set()

    def call(self, fn, *args, **kwargs):
        return asyncio.run_coroutine_threadsafe(self._call(fn, args, kwargs), self.loop)

    def stop(self):
        return self.call(self._stop)

def create_portal():
    portal = None

    async def wait_stop():
        nonlocal portal
        stop_event = asyncio.Event()
        portal = Portal(stop_event)
        running_event.set()
        await stop_event.wait()

    def run():
        asyncio.run(wait_stop())

    running_event = threading.Event()
    thread = threading.Thread(target=run)
    thread.start()
    running_event.wait()

    return portal
Run Code Online (Sandbox Code Playgroud)

使用示例:

async def test(msg):
    await asyncio.sleep(0.5)
    print(msg)
    return "HELLO " + msg

# it'll run a new event loop in separate thread
portal = create_portal()
# it'll call `test` in the separate thread and return a Future 
print(portal.call(test, "WORLD").result())
portal.stop().result()
Run Code Online (Sandbox Code Playgroud)

在你的情况下:

def sync_cb():
    calls = [portal.call(ws.send, 'test') for ws in connected_ws]
    # if you want to get results from these calls:
    # [c.result() for c in calls]
Run Code Online (Sandbox Code Playgroud)

而且,仅使用 asyncio.run 和 asyncio.wait 来调用简单的异步方法会产生什么样的开销

asyncio.run将创建一个新的事件循环并关闭它。如果不经常调用回调,很可能不会出现问题。但如果您也在另一个回调中使用asyncio.run,那么它们将无法同时工作。