如何截取来自对一个服务的调用中的标头,然后将其插入到gRPC-java中的另一个请求中?

use*_*511 0 grpc grpc-java

我有两个服务器-HelloServer和WorldServer。

两者都实现了相同的原始文件:

// The greeting service definition.
service GreeterService {
    // Sends a greeting
    rpc GreetWithHelloOrWorld (GreeterRequest) returns (GreeterReply) {}
    rpc GreetWithHelloWorld (GreeterRequest) returns (GreeterReply) {}
}

message GreeterRequest {
    string id = 1;
}

// The response message containing the greetings
message GreeterReply {
    string message = 1;
    string id = 2;
}
Run Code Online (Sandbox Code Playgroud)

我想将traceIds添加到请求中。据我了解,这是通过在Metadata对象中添加traceId来实现的。

这是我用来检查traceIds是否通过的测试。向HelloServer发出请求,后者依次调用WorldServer,然后最终返回响应。

@Test
public void greetHelloWorld() {
    String traceId = UUID.randomUUID().toString();
    Metadata metadata = new Metadata();
    metadata.put(MetadataKeys.TRACE_ID_METADATA_KEY, traceId);

    Greeter.GreeterRequest greeterRequest = Greeter.GreeterRequest.newBuilder().setId(traceId).build();

    ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext(true).build();

    AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
    AtomicReference<Metadata> headersCapture = new AtomicReference<>();
    ClientInterceptor clientInterceptor = MetadataUtils.newAttachHeadersInterceptor(metadata);
    ClientInterceptor metadataCapturingClientInterceptor = MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture);

    GreeterServiceBlockingStub blockingStub = GreeterServiceGrpc.newBlockingStub(ClientInterceptors.intercept(channel, clientInterceptor, metadataCapturingClientInterceptor));
    GreeterServiceStub asyncStub = GreeterServiceGrpc.newStub(channel);

    try {
        Greeter.GreeterReply greeterReply = blockingStub.greetWithHelloWorld(greeterRequest);
        String idInResponse = greeterReply.getId();
        String idInHeaders = headersCapture.get().get(MetadataKeys.TRACE_ID_METADATA_KEY);
        logger.info("Response from HelloService and WorldService  -- , id = {}, headers = {}", greeterReply.getMessage(), idInResponse, idInHeaders);
        assertEquals("Ids in response and header did not match", idInResponse, idInHeaders);
    } catch (StatusRuntimeException e) {
        logger.warn("Exception when calling HelloService and WorldService\n" +  e);
        fail();
    } finally {
        channel.shutdown();
    }

}
Run Code Online (Sandbox Code Playgroud)

ServerInterceptor的实现:

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {

    String traceId = headers.get(MetadataKeys.TRACE_ID_METADATA_KEY);
    logger.info("objId=" + this.toString().substring(this.toString().lastIndexOf('@')) + " Trace id -- 1=" + headers.get(MetadataKeys.TRACE_ID_METADATA_KEY));
    return next.startCall(new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
        @Override
        public void sendHeaders(Metadata headers) {
            headers.put(MetadataKeys.TRACE_ID_METADATA_KEY, traceId);
            logger.info("objId=" + this.toString().substring(this.toString().lastIndexOf('@')) + " Trace id -- 2  " + headers.get(MetadataKeys.TRACE_ID_METADATA_KEY));
            super.sendHeaders(headers);
        }

        @Override
        public void sendMessage(RespT message) {
            logger.info("objId=" + this.toString().substring(this.toString().lastIndexOf('@')) + " message=" + message.toString());
            super.sendMessage(message);
        }
    }, headers);
Run Code Online (Sandbox Code Playgroud)

这是greetWithHelloWorld()方法的实现:

public void greetWithHelloWorld(com.comcast.manitoba.world.hello.Greeter.GreeterRequest request,
                                    io.grpc.stub.StreamObserver<com.comcast.manitoba.world.hello.Greeter.GreeterReply> responseObserver) {
        Greeter.GreeterRequest greeterRequest = Greeter.GreeterRequest.newBuilder().setId(request.getId()).build();

        Metadata metadata = new Metadata();
        metadata.put(MetadataKeys.TRACE_ID_METADATA_KEY, "");

        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8081)
                .usePlaintext(true).build();

        AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
        AtomicReference<Metadata> headersCapture = new AtomicReference<>();
        ClientInterceptor clientInterceptor = MetadataUtils.newAttachHeadersInterceptor(metadata);
        ClientInterceptor metadataCapturingClientInterceptor = MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture);

        GreeterServiceGrpc.GreeterServiceBlockingStub blockingStub = GreeterServiceGrpc.newBlockingStub(ClientInterceptors.intercept(channel, clientInterceptor, metadataCapturingClientInterceptor));

        String messageFromWorldService = "";
        String replyIdFromWorldService = "";
        try {
            Greeter.GreeterReply greeterReply = blockingStub.greetWithHelloOrWorld(greeterRequest);
            messageFromWorldService = greeterReply.getMessage();
            replyIdFromWorldService = greeterReply.getId();
            logger.info("Response from WorldService  -- {}, id = {}", messageFromWorldService, replyIdFromWorldService);
        } catch (StatusRuntimeException e) {
            logger.warn("Exception when calling HelloService\n" +  e);
        }

        Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService).setId(replyIdFromWorldService).build();
        responseObserver.onNext(greeterReply);
        responseObserver.onCompleted();
    }
Run Code Online (Sandbox Code Playgroud)

问题出在greetWithHelloWorld()方法中,我无权访问元数据,因此无法从标头中提取traceId并将其附加到对World Server的请求中。但是,如果我在该方法中设置了一个断点,则可以看到该请求对象中确实包含了traceId,该traceId是私有的并且不可访问。

有什么想法可以实现吗?另外,这是传递traceId的最佳方法吗?我发现了一些有关使用上下文的参考。上下文和元数据有什么区别?

Eri*_*son 5

预期的方法是使用ClientInterceptor和ServerInterceptor。客户端拦截器将从Context复制到元数据。服务器拦截器将从元数据复制到上下文。在服务器拦截器中使用Contexts.interceptCall可以将Context应用到所有回调。

元数据用于导线级传播。上下文用于进程内传播。通常,应用程序不需要与Java中的元数据直接交互。