TLDR:我正在寻找一种方法来更新每次调用的开放流上的标头,而stream.Send(msg)
无需关闭流并打开一个新流。
概括
我有一个 GRPC 客户端和服务器来处理双向流。要向服务器进行身份验证,客户端必须在请求标头中发送 JWT,并将其设置为“授权”。令牌的有效期为 30 分钟。令牌过期后,服务器将终止连接。
我正在寻找一种方法来刷新客户端的授权令牌,并保持流打开。客户端应在循环中运行,每 30 分钟使用更新的令牌和更新的负载执行一个新请求。我还没有找到从客户端更新已打开流的标头的方法。
让我们看一些代码来了解客户端的样子。下面的代码有一个函数用于创建客户端的新实例,另一个函数用于建立与 GRPC 服务器的连接。
func NewWatchClient(config *Config, logger *logrus.Logger) (*WatchClient, error) {
cc, err := newConnection(config, logger)
if err != nil {
return nil, err
}
service := proto.NewWatchServiceClient(cc)
return &WatchClient{
config: config,
conn: cc,
logger: entry,
service: service,
}, nil
}
func newConnection(config *Config, logger *logrus.Logger) (*grpc.ClientConn, error) {
address := fmt.Sprintf("%s:%d", config.Host, config.Port)
// rpcCredential implements credentials.PerRPCCredentials
rpcCredential := newTokenAuth(config.Auth, config.TenantID)
return grpc.Dial(
address,
grpc.WithPerRPCCredentials(rpcCredential),
)
}
Run Code Online (Sandbox Code Playgroud)
查看newConnection
上面的函数,我可以看到调用了另一个函数 来newTokenAuth
创建身份验证令牌。此函数返回一个实现PerRPCCredentials接口的结构。
有两种方法可以设置请求的授权。
使用grpc.WithPerRPCCredentials在创建与服务器的连接时添加授权。
使用grpc.PerRPCCredentials将授权添加到在服务器连接上打开的每个流。
在本例中,我grpc.WithPerRPCCredentials
在创建与服务器的连接时附加令牌。
现在,让我们看一下PerRPCCredentials的定义。
type PerRPCCredentials interface {
// GetRequestMetadata gets the current request metadata, refreshing
// tokens if required. This should be called by the transport layer on
// each request, and the data should be populated in headers or other
// context. If a status code is returned, it will be used as the status
// for the RPC. uri is the URI of the entry point for the request.
// When supported by the underlying implementation, ctx can be used for
// timeout and cancellation. Additionally, RequestInfo data will be
// available via ctx to this call.
// TODO(zhaoq): Define the set of the qualified keys instead of leaving
// it as an arbitrary string.
GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
// RequireTransportSecurity indicates whether the credentials requires
// transport security.
RequireTransportSecurity() bool
}
Run Code Online (Sandbox Code Playgroud)
该接口要求您定义两个方法。的文档GetRequestMetadata
说
GetRequestMetadata 获取当前请求元数据,如果需要刷新令牌
因此,看起来我的实现PerRPCCredentials
应该能够处理我的流或连接的令牌刷新。我们来看看我的实现PerRPCCredentials
。
// tokenAuth implements the PerRPCCredentials interface
type tokenAuth struct {
tenantID string
tokenRequester auth.PlatformTokenGetter
token string
}
// RequireTransportSecurity leave as false for now
func (tokenAuth) RequireTransportSecurity() bool {
return false
}
// GetRequestMetadata sets the http header prior to transport
func (t tokenAuth) GetRequestMetadata(_ context.Context, _ ...string) (map[string]string, error) {
token, err := t.tokenRequester.GetToken()
if err != nil {
return nil, err
}
t.token = token
go func() {
time.Sleep(25 * time.Minute)
token, _ := t.tokenRequester.GetToken()
t.token = token
}()
return map[string]string{
"tenant-id": t.tenantID,
"authorization": "Bearer " + t.token,
}, nil
}
Run Code Online (Sandbox Code Playgroud)
如您所见,调用GetRequestMetadata
将建立一个 go 例程,该例程将尝试每 25 分钟刷新一次令牌。在这里添加 go 例程可能不是正确的方法。这是尝试刷新 auth 标头,但这不起作用。
让我们看一下流。
func (w WatchClient) CreateWatch() error {
topic := &proto.Request{SelfLink: w.config.TopicSelfLink}
stream, err := w.service.CreateWatch(context.Background())
if err != nil {
return err
}
for {
err = stream.Send(topic)
if err != nil {
return err
}
time.Sleep(25 * time.Minute)
}
}
Run Code Online (Sandbox Code Playgroud)
客户端每 25 分钟在流上发送一条消息。我想要到达这里的是,当stream.Send
被调用时,更新的令牌也会被发送。
GetRequestMetadata
无论我是否通过设置身份验证,此函数都只会被调用一次grpc.WithPerRPCCredentials
,grpc.PerRPCCredsCallOption
因此似乎无法更新授权标头。
如果您知道我在尝试使用令牌刷新时错过了什么,PerRPCCredentials
请告诉我。
谢谢。