我一直在尝试在服务器流拦截器上设置元数据,以便实际的 RPC 函数可以在下游读取它们:
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ss.SendHeader(metadata.New(map[string]string{"X-User-Id": "real_user_id"}))
return handler(srv, ss)
}
func (server *Server) GetObjects(req *iam.GetObjectsRequest, client iam.Service_GetObjectsServer) error {
ctx := client.Context()
userID, ok := HeaderFromMetadata(ctx, "X-User-Id")
log.Printf("User ID: %s, Ok: %t\n", userID, ok)
return nil
}
func HeaderFromMetadata(ctx context.Context, headers ...string) (string, bool) {
meta, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", false
}
for _, header := range headers {
if value := meta.Get(header); len(value) > 0 {
return value[0], true
}
}
return "", false
}
Run Code Online (Sandbox Code Playgroud)
我的服务器是这样注册的:
server := grpc.NewServer(
grpc.StreamInterceptor(UserIDInterceptor))
RegisterIAMServer(server, NewServer())
Run Code Online (Sandbox Code Playgroud)
我遇到的问题是找不到用户 ID 标头。我可以看到当客户端发送请求时拦截器被调用,并且我可以看到元数据包含标头,但实际的RPC似乎无法提取它。我在这里做错了什么?
更新
更简单的解决方案只是重写Context()方法ServerStream
type serverStream struct {
grpc.ServerStream
ctx context.Context
}
func (s *serverStream) Context() context.Context {
return s.ctx
}
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
return handler(srv, &serverStream{ss, newCtx})
}
Run Code Online (Sandbox Code Playgroud)
更新
另一种简单的解决方案是定义一个包装器,grpc.ServerStream如下所示
type serverStreamWrapper struct {
ss grpc.ServerStream
ctx context.Context
}
func (w serverStreamWrapper) Context() context.Context { return w.ctx }
func (w serverStreamWrapper) RecvMsg(msg interface{}) error { return w.ss.RecvMsg(msg) }
func (w serverStreamWrapper) SendMsg(msg interface{}) error { return w.ss.SendMsg(msg) }
func (w serverStreamWrapper) SendHeader(md metadata.MD) error { return w.ss.SendHeader(md) }
func (w serverStreamWrapper) SetHeader(md metadata.MD) error { return w.ss.SetHeader(md) }
func (w serverStreamWrapper) SetTrailer(md metadata.MD) { w.ss.SetTrailer(md) }
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
return handler(srv, serverStreamWrapper{ss, newCtx})
}
Run Code Online (Sandbox Code Playgroud)
您可以使用NewIncomingContext在流中创建当前上下文的副本。
由于没有方法来设置 的context,grpc.ServerStream为了将上下文设置回ServerStream,wrappedStream定义为context.Context,并SetContext设置方法context.Context
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrappedStream) SetContext(ctx context.Context) {
w.ctx = ctx
}
Run Code Online (Sandbox Code Playgroud)
完整示例代码
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrappedStream) Context() context.Context {
return w.ctx
}
func (w *wrappedStream) SetContext(ctx context.Context) {
w.ctx = ctx
}
func (w *wrappedStream) RecvMsg(m interface{}) error {
return w.ServerStream.RecvMsg(m)
}
func (w *wrappedStream) SendMsg(m interface{}) error {
return w.ServerStream.SendMsg(m)
}
type StreamContextWrapper interface {
grpc.ServerStream
SetContext(context.Context)
}
func newStreamContextWrapper(ss grpc.ServerStream) StreamContextWrapper {
ctx := ss.Context()
return &wrappedStream{
ss,
ctx,
}
}
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
sw := newStreamContextWrapper(ss)
sw.SetContext(newCtx)
return handler(srv, sw)
}
Run Code Online (Sandbox Code Playgroud)