如果事件循环已在运行,如何在方法中等待协程同步完成?

Tag*_*agc 5 python python-3.x python-asyncio

我正在尝试创建一个基于Python的CLI,该CLI通过websockets与Web服务进行通信。我遇到的一个问题是CLI间歇性地向Web服务发出的请求无法得到处理。查看来自Web服务的日志,我可以看到问题是由以下事实引起的:套接字关闭的同时(甚至在关闭之后)经常发出这些请求:

2016-09-13 13:28:10,930 [22 ] INFO  DeviceBridge - Device bridge has opened
2016-09-13 13:28:11,936 [21 ] DEBUG DeviceBridge - Device bridge has received message
2016-09-13 13:28:11,937 [21 ] DEBUG DeviceBridge - Device bridge has received valid message
2016-09-13 13:28:11,937 [21 ] WARN  DeviceBridge - Unable to process request: {"value": false, "path": "testcube.pwms[0].enabled", "op": "replace"}
2016-09-13 13:28:11,936 [5  ] DEBUG DeviceBridge - Device bridge has closed
Run Code Online (Sandbox Code Playgroud)

在我的CLI中,我定义了一个类CommunicationService,负责处理与Web服务的所有直接通信。在内部,它使用该websockets程序包处理通信,该程序包本身建立在之上asyncio

CommunicationService 包含以下发送请求的方法:

def send_request(self, request: str) -> None:
    logger.debug('Sending request: {}'.format(request))
    asyncio.ensure_future(self._ws.send(request))
Run Code Online (Sandbox Code Playgroud)

...在哪儿ws用另一个方法打开了一个websocket:

self._ws = await websockets.connect(websocket_address)
Run Code Online (Sandbox Code Playgroud)

我想要的是能够等待将来返回的将来asyncio.ensure_future,如果有必要,可以在之后短暂睡眠,以便在关闭websocket之前给网络服务时间处理请求。

但是,由于send_request是一种同步方法,因此不能简单地将await这些期货交易。使它异步将是没有意义的,因为没有什么可以等待它返回的协程对象。loop.run_until_complete由于循环在被调用时已经在运行,因此我也无法使用。

我发现有人描述了一个与我在mail.python.org上遇到的问题非常相似的问题。在该线程中发布的解决方案是在循环已经运行的情况下,使函数返回协程对象:

def aio_map(coro, iterable, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    coroutines = map(coro, iterable)
    coros = asyncio.gather(*coroutines, return_exceptions=True, loop=loop)

    if loop.is_running():
        return coros
    else:
        return loop.run_until_complete(coros)
Run Code Online (Sandbox Code Playgroud)

这对我来说是不可能的,因为我正在使用PyRx(反应式框架的Python实现),send_request并且仅被称为Rx可观察者,这意味着返回值将被丢弃,并且对我的代码不可用:

class AnonymousObserver(ObserverBase):
    ...
    def _on_next_core(self, value):
        self._next(value)
Run Code Online (Sandbox Code Playgroud)

附带说明一下,我不确定这是否是asyncio经常遇到的某种问题,还是不确定是否只是遇到了问题,但我发现使用起来非常令人沮丧。在C#中(例如),我所需要做的可能只是以下内容:

def send_request(self, request: str) -> None:
    logger.debug('Sending request: {}'.format(request))
    asyncio.ensure_future(self._ws.send(request))
Run Code Online (Sandbox Code Playgroud)

同时,asyncio“ wait”的版本无奈地返回了另一个我被迫丢弃的协程。

更新资料

我找到了解决此问题的方法,该方法似乎可行。我有一个异步回调,该回调在命令执行后且CLI终止之前被执行,因此我从此进行了更改...

async def after_command():
    await comms.stop()
Run Code Online (Sandbox Code Playgroud)

...对此:

async def after_command():
    await asyncio.sleep(0.25)  # Allow time for communication
    await comms.stop()
Run Code Online (Sandbox Code Playgroud)

不过,我仍然很高兴收到此问题的任何答案,以供将来参考。在其他情况下,我可能无法依靠这种变通方法,并且我仍然认为,最好在内部执行延迟,send_request这样的客户就CommunicationService不必担心时间问题,这是更好的做法。

关于文森特的问题:

您的循环是否在其他线程中运行,还是由某些回调调用send_request?

一切都在同一线程中运行-由回调调用。发生的是,我定义了所有命令以使用异步回调,并且在执行这些命令时,其中一些命令将尝试向Web服务发送请求。由于它们是异步的,因此只有在通过调用loop.run_until_complete在 CLI的顶层 -这意味着循环将在它们执行和发出此请求的中途运行(通过间接调用send_request)。

更新2

这是基于Vincent建议添加“完成”回调的解决方案。

_busy添加了一个新的布尔字段CommunicationService以表示是否正在进行通讯活动。

CommunicationService.send_request_busy在发送请求之前被修改为true,然后提供回调_ws.send以重置_busy完成:

def send_request(self, request: str) -> None:
    logger.debug('Sending request: {}'.format(request))

    def callback(_):
        self._busy = False

    self._busy = True
    asyncio.ensure_future(self._ws.send(request)).add_done_callback(callback)
Run Code Online (Sandbox Code Playgroud)

CommunicationService.stop 现在已实现,以等待在执行此标志之前将其设置为false:

async def stop(self) -> None:
    """
    Terminate communications with TestCube Web Service.
    """
    if self._listen_task is None or self._ws is None:
        return

    # Wait for comms activity to stop.
    while self._busy:
        await asyncio.sleep(0.1)

    # Allow short delay after final request is processed.
    await asyncio.sleep(0.1)

    self._listen_task.cancel()
    await asyncio.wait([self._listen_task, self._ws.close()])

    self._listen_task = None
    self._ws = None
    logger.info('Terminated connection to TestCube Web Service')
Run Code Online (Sandbox Code Playgroud)

这似乎也可行,至少以这种方式,所有通信时序逻辑都CommunicationService应按原样封装在类中。

更新3

基于Vincent提议的更好的解决方案。

取而代之的是self._busy我们self._send_request_tasks = []

新的send_request实现:

def send_request(self, request: str) -> None:
    logger.debug('Sending request: {}'.format(request))

    task = asyncio.ensure_future(self._ws.send(request))
    self._send_request_tasks.append(task)
Run Code Online (Sandbox Code Playgroud)

新的stop实现:

async def stop(self) -> None:
    if self._listen_task is None or self._ws is None:
        return

    # Wait for comms activity to stop.
    if self._send_request_tasks:
        await asyncio.wait(self._send_request_tasks)
    ...
Run Code Online (Sandbox Code Playgroud)

Vin*_*ent 3

您可以使用一系列set任务:

self._send_request_tasks = set()
Run Code Online (Sandbox Code Playgroud)

使用安排任务ensure_future并使用清理add_done_callback

def send_request(self, request: str) -> None:
    task = asyncio.ensure_future(self._ws.send(request))
    self._send_request_tasks.add(task)
    task.add_done_callback(self._send_request_tasks.remove)
Run Code Online (Sandbox Code Playgroud)

并等待set任务完成:

async def stop(self):
    if self._send_request_tasks:
        await asyncio.wait(self._send_request_tasks)
Run Code Online (Sandbox Code Playgroud)