处理 gRPC python asyncio 中的客户端取消?

bru*_*ton 2 python python-asyncio grpc grpc-python

先问问题,再看上下文。如何使用异步 gRPC python 服务器基于从客户端取消 RPC 来执行某些服务器端操作(例如清理)?

在我的微服务中,我有一个asynciogRPC 服务器,其主要 RPC 是双向流。

在客户端(也使用 asyncio),当我取消某些操作时,它会引发一个asyncio.CancelledError被捕获且不会被grpc核心重新引发的异常:

https://github.com/grpc/grpc/blob/master/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi#L679

    except asyncio.CancelledError:
        _LOGGER.debug('RPC cancelled for servicer method [%s]', _decode(rpc_state.method()))
Run Code Online (Sandbox Code Playgroud)

asyncio.CancelledError所以我不能依赖于在我自己的代码中捕获,因为它是预先捕获的并且不会重新引发。

共享上下文应该包含有关 RPC 是否在客户端被取消的信息,通过.cancel()从 RPC 调用进行调用并能够通过调用查看它是否被取消.cancelled()

https://grpc.github.io/grpc/python/grpc_asyncio.html#shared-context

抽象取消()

Cancels the RPC.

Idempotent and has no effect if the RPC has already terminated.

Returns

    A bool indicates if the cancellation is performed or not.
Return type

    bool
Run Code Online (Sandbox Code Playgroud)

摘要取消()

Return True if the RPC is cancelled.

The RPC is cancelled when the cancellation was requested with cancel().

Returns

    A bool indicates whether the RPC is cancelled or not.
Return type

    bool
Run Code Online (Sandbox Code Playgroud)

但是,此共享上下文不会附加到contextgRPC 生成的代码在服务器端提供给 RPC 的变量。(我不能跑context.cancelled()或者context.add_done_callback;他们不在场)

那么,问题又来了:如何使用异步 gRPC python 服务器基于从客户端取消 RPC 来执行某些服务器端操作(例如清理) ?

Lid*_*eng 5

谢谢你的帖子。我们已经意识到这个问题,并且添加对这两种方法的支持已在我们的路线图中。

对于短期解决方案,您可以使用 try-catch 和装饰器。客户端取消被视为asyncio.CancelledError方法内处理程序。这是一个修改后的helloworld 示例

服务器代码:

class Greeter(helloworld_pb2_grpc.GreeterServicer):

    async def SayHello(
            self, request: helloworld_pb2.HelloRequest,
            context: grpc.aio.ServicerContext) -> helloworld_pb2.HelloReply:
        try:
            await asyncio.sleep(4)
        except asyncio.CancelledError:
            print('RPC cancelled')
            raise
        return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
Run Code Online (Sandbox Code Playgroud)

客户端代码:

async def run() -> None:
    async with grpc.aio.insecure_channel('localhost:50051') as channel:
        stub = helloworld_pb2_grpc.GreeterStub(channel)
        call = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
        await asyncio.sleep(2)
        call.cancel()
        print(await call.code())

Run Code Online (Sandbox Code Playgroud)