Ume*_*ooq 7 java android server grpc grpc-java
我正在使用 GRPC 开发一个聊天应用程序,其中服务器从客户端接收信息并将其发送回所有连接到它的客户端。为此,我使用了Saturnism 的 聊天示例作为参考。我复制了代码,代码编译并运行,但服务器应该从未收到来自客户端的任何请求。
我的问题是:
文库服务器.java
public class WingokuServer {
public static void main(String[] args) throws IOException, InterruptedException {
Server server = ServerBuilder.forPort(8091)
.intercept(recordRequestHeadersInterceptor())
.addService(new WingokuServiceImpl())
.build();
System.out.println("Starting server...");
server.start();
System.out.println("Server started!");
server.awaitTermination();
}
Run Code Online (Sandbox Code Playgroud)
WinokuServerSideService实现:
public class WingokuServiceImpl extends WingokuServiceGrpc.WingokuServiceImplBase {
private static Set<StreamObserver<Response>> observers =
Collections.newSetFromMap(new ConcurrentHashMap<>());
public WingokuServiceImpl() {
System.out.println("WingokuServiceImp");
}
@Override
public StreamObserver<Request> messages(StreamObserver<Response> responseObserver) {
System.out.println("messages");
observers.add(responseObserver);
return new StreamObserver<Request>() {
@Override
public void onNext(Request request) {
System.out.println("Server onNext: ");
System.out.println("request from client is: "+ request.getRequestMessage());
Response response = Response.newBuilder().setResponseMessage("new Message From server at time: "+ System.nanoTime()).build();
for (StreamObserver<Response> observer : observers) {
observer.onNext(response);
}
}
@Override
public void onError(Throwable throwable) {
System.out.println("Server onError: ");
throwable.printStackTrace();
}
@Override
public void onCompleted() {
observers.remove(responseObserver);
System.out.println("Server onCompleted ");
}
};
}
}
Run Code Online (Sandbox Code Playgroud)
文库客户端:
public class WingokuClient {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8091).usePlaintext(true).build();
WingokuServiceGrpc.WingokuServiceStub asyncStub = WingokuServiceGrpc.newStub(channel);
StreamObserver<Request> requestStreamObserver = asyncStub.messages(new StreamObserver<Response>() {
@Override
public void onNext(Response response) {
System.out.println("Client onNext");
System.out.println("REsponse from server is: "+ response.getResponseMessage());
}
@Override
public void onError(Throwable throwable) {
System.out.println("Client onError");
throwable.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("Client OnComplete");
}
});
requestStreamObserver.onNext(Request.newBuilder().setRequestMessage("Message From Client").build());
requestStreamObserver.onCompleted();
channel.shutdown();
System.out.println("exiting client");
}
}
Run Code Online (Sandbox Code Playgroud)
编辑:
代码没有任何问题。有用。我只需要将 awaitTermination 添加到客户端的通道,因为没有它只会立即关闭客户端和服务器之间的连接,甚至可能在请求从客户端发送到网络之前。这就是服务器从未收到任何请求的原因。
然而,我关于启用详细日志记录和/或向服务器端添加某种拦截器的问题仍未得到解答。所以我期待着从这里的专家那里得到一些指点。
小智 8
时隔多年,让我也回答一下这个问题(希望对遇到同样问题的人有用)。我基本上通过以Shoohei的响应为例并尝试尽可能压缩它来解决。
服务器拦截器
public class ServerLogInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
ServerCall<ReqT, RespT> listener = new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendMessage(RespT message) {
log.debug("Sending message to cliens: {}", message);
super.sendMessage(message);
}
};
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(next.startCall(listener, headers)) {
@Override
public void onMessage(ReqT message) {
log.debug("Received message from cliens: {}", message);
super.onMessage(message);
}
};
}}
Run Code Online (Sandbox Code Playgroud)
客户端拦截器
public class ClientLogInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next
) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void sendMessage(ReqT message) {
log.debug("Sending message to modules: {}", message);
super.sendMessage(message);
}
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onMessage(RespT message) {
log.debug("Received message from modules: {}", message);
super.onMessage(message);
}
}, headers);
}
};
}
}
Run Code Online (Sandbox Code Playgroud)
(我不确定是否正确粘贴了代码,以防添加或删除一些括号)
我找到了一种使用拦截器在服务器端和客户端记录请求和响应的方法,它使代码更清晰。也可以使用侦探进行跟踪。
请使用弹簧:
implementation 'io.github.lognet:grpc-spring-boot-starter'
Run Code Online (Sandbox Code Playgroud)
服务器部分
然后您可以使用 GRpcGlobalInterceptor 注释
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import org.lognet.springboot.grpc.GRpcGlobalInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
@GRpcGlobalInterceptor
public class GrpcInterceptor implements ServerInterceptor {
private Logger logger = LoggerFactory.getLogger(this.getClass());
public static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of("traceId", ASCII_STRING_MARSHALLER);
@Override
public <M, R> ServerCall.Listener<M> interceptCall(
ServerCall<M, R> call, Metadata headers, ServerCallHandler<M, R> next) {
String traceId = headers.get(TRACE_ID_KEY);
// TODO: Add traceId to sleuth
logger.warn("traceId from client: {}. TODO: Add traceId to sleuth", traceId);
GrpcServerCall grpcServerCall = new GrpcServerCall(call);
ServerCall.Listener listener = next.startCall(grpcServerCall, headers);
return new GrpcForwardingServerCallListener<M>(call.getMethodDescriptor(), listener) {
@Override
public void onMessage(M message) {
logger.info("Method: {}, Message: {}", methodName, message);
super.onMessage(message);
}
};
}
private class GrpcServerCall<M, R> extends ServerCall<M, R> {
ServerCall<M, R> serverCall;
protected GrpcServerCall(ServerCall<M, R> serverCall) {
this.serverCall = serverCall;
}
@Override
public void request(int numMessages) {
serverCall.request(numMessages);
}
@Override
public void sendHeaders(Metadata headers) {
serverCall.sendHeaders(headers);
}
@Override
public void sendMessage(R message) {
logger.info("Method: {}, Response: {}", serverCall.getMethodDescriptor().getFullMethodName(), message);
serverCall.sendMessage(message);
}
@Override
public void close(Status status, Metadata trailers) {
serverCall.close(status, trailers);
}
@Override
public boolean isCancelled() {
return serverCall.isCancelled();
}
@Override
public MethodDescriptor<M, R> getMethodDescriptor() {
return serverCall.getMethodDescriptor();
}
}
private class GrpcForwardingServerCallListener<M> extends io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener<M> {
String methodName;
protected GrpcForwardingServerCallListener(MethodDescriptor method, ServerCall.Listener<M> listener) {
super(listener);
methodName = method.getFullMethodName();
}
}
}
Run Code Online (Sandbox Code Playgroud)
客户端部分
拦截器:
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
@Component
public class BackendInterceptor implements ClientInterceptor {
private Logger logger = LoggerFactory.getLogger(this.getClass());
public static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of("traceId", ASCII_STRING_MARSHALLER);
@Override
public <M, R> ClientCall<M, R> interceptCall(
final MethodDescriptor<M, R> method, CallOptions callOptions, Channel next) {
return new BackendForwardingClientCall<M, R>(method,
next.newCall(method, callOptions.withDeadlineAfter(10000, TimeUnit.MILLISECONDS))) {
@Override
public void sendMessage(M message) {
logger.info("Method: {}, Message: {}", methodName, message);
super.sendMessage(message);
}
@Override
public void start(Listener<R> responseListener, Metadata headers) {
// TODO: Use the sleuth traceId instead of 999
headers.put(TRACE_ID_KEY, "999");
BackendListener<R> backendListener = new BackendListener<>(methodName, responseListener);
super.start(backendListener, headers);
}
};
}
private class BackendListener<R> extends ClientCall.Listener<R> {
String methodName;
ClientCall.Listener<R> responseListener;
protected BackendListener(String methodName, ClientCall.Listener<R> responseListener) {
super();
this.methodName = methodName;
this.responseListener = responseListener;
}
@Override
public void onMessage(R message) {
logger.info("Method: {}, Response: {}", methodName, message);
responseListener.onMessage(message);
}
@Override
public void onHeaders(Metadata headers) {
responseListener.onHeaders(headers);
}
@Override
public void onClose(Status status, Metadata trailers) {
responseListener.onClose(status, trailers);
}
@Override
public void onReady() {
responseListener.onReady();
}
}
private class BackendForwardingClientCall<M, R> extends io.grpc.ForwardingClientCall.SimpleForwardingClientCall<M, R> {
String methodName;
protected BackendForwardingClientCall(MethodDescriptor<M, R> method, ClientCall delegate) {
super(delegate);
methodName = method.getFullMethodName();
}
}
}
Run Code Online (Sandbox Code Playgroud)
将拦截器添加到通道:
ManagedChannel managedChannel = ManagedChannelBuilder
.forAddress(_URL_, _PORT_).usePlaintext().intercept(backendInterceptor).build();
Run Code Online (Sandbox Code Playgroud)
小智 2
您可以在 Netty 传输中打开帧日志记录。首先,创建一个名为logging.properties. 在文件中放入以下内容:
handlers=java.util.logging.ConsoleHandler
io.grpc.netty.level=FINE
java.util.logging.ConsoleHandler.level=FINE
java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter
Run Code Online (Sandbox Code Playgroud)
然后使用 jvm 标志启动 Java 二进制文件
-Djava.util.logging.config.file=logging.properties