python - 在 gRPC 无限流期间检测客户端丢失

Fel*_*Jun 4 python stream grpc

我正在实现一个无限流响应,比如使用 gRPC 架构的发布/订阅模式。

有一个端点打开一个响应流并保持它直到客户端掉线。为此,我存储了一个键值哈希,其中键是 gRPC 上下文,值是我用来轮询发送消息的队列。

我的端点代码如下所示:

def StreamTrades(self, request, context):
    self.feeds[context] = queue.Queue()

    callback_queue = queue.Queue()

    def remove_feed():
        if self.feeds.get(context) is not None:
            del self.feeds[context]

    def stop_stream():
        remove_feed()

        def raise_stop_stream_exception():
            raise StopStream('stopping stream')

        callback_queue.put(raise_stop_stream_exception)

    context.add_callback(stop_stream)

    def output_generator():
        while True:
            try:
                try:
                    callback = callback_queue.get(False)
                    callback()
                except queue.Empty:
                    pass
                if self.feeds.get(context) is not None:
                    trade = self.feeds[context].get()
                    if isinstance(trade, trades_pb2.Trade):
                        yield trade
                else:
                    raise StopStream('stopping stream')
            except IndexError:
                pass
            except StopStream:
                return

    return output_generator()
Run Code Online (Sandbox Code Playgroud)

此代码适用于订阅和发布对客户端的更改。但是有一个与取消订阅相关的问题。检测客户端丢失的好方法是什么?使用 Context.add_callback(callBack) 似乎不起作用,因为只有在服务器完成并关闭流时才会调用回调。当客户端不再存在时,生成器不会引发任何类型的状态。我看到在 Java 中,当在 streamObserver 中调用 onNext 并且没有客户端时抛出 StatusRuntimeException 和 Status.CANCELLED,它允许延迟取消订阅,这对我来说已经足够了。

有没有办法检测客户端在响应流期间断开连接?

小智 7

ServicerContext.add_callback当客户端断开连接时,应调用您注册的回调;它没有被调用表明您正在遭受此错误。这是不是的情况下,“当服务器完成并关闭流回调仅被称为”。