Fel*_*Jun 4 python stream grpc
我正在实现一个无限流响应,比如使用 gRPC 架构的发布/订阅模式。
有一个端点打开一个响应流并保持它直到客户端掉线。为此,我存储了一个键值哈希,其中键是 gRPC 上下文,值是我用来轮询发送消息的队列。
我的端点代码如下所示:
def StreamTrades(self, request, context):
self.feeds[context] = queue.Queue()
callback_queue = queue.Queue()
def remove_feed():
if self.feeds.get(context) is not None:
del self.feeds[context]
def stop_stream():
remove_feed()
def raise_stop_stream_exception():
raise StopStream('stopping stream')
callback_queue.put(raise_stop_stream_exception)
context.add_callback(stop_stream)
def output_generator():
while True:
try:
try:
callback = callback_queue.get(False)
callback()
except queue.Empty:
pass
if self.feeds.get(context) is not None:
trade = self.feeds[context].get()
if isinstance(trade, trades_pb2.Trade):
yield trade
else:
raise StopStream('stopping stream')
except IndexError:
pass
except StopStream:
return
return output_generator()
Run Code Online (Sandbox Code Playgroud)
此代码适用于订阅和发布对客户端的更改。但是有一个与取消订阅相关的问题。检测客户端丢失的好方法是什么?使用 Context.add_callback(callBack) 似乎不起作用,因为只有在服务器完成并关闭流时才会调用回调。当客户端不再存在时,生成器不会引发任何类型的状态。我看到在 Java 中,当在 streamObserver 中调用 onNext 并且没有客户端时抛出 StatusRuntimeException 和 Status.CANCELLED,它允许延迟取消订阅,这对我来说已经足够了。
有没有办法检测客户端在响应流期间断开连接?
| 归档时间: |
|
| 查看次数: |
2213 次 |
| 最近记录: |