我有一个应用程序(我们称之为客户端)通过 gRPC 连接到同一台计算机上的另一个进程(我们称之为服务器)。通信通过 unix 套接字进行。
如果服务器重新启动,我的客户端会收到一个EOF连接,并且不会重新建立连接,尽管我希望它clientConn能够自动处理重新连接。
为什么拨号器不处理重新连接?我希望它能通过我传递的退避参数来实现这一点。
下面是一些伪MWE。
Run建立初始连接,然后生成goroutineOnegoroutineOne等待连接准备好并委托send给fooUpdaterfooUpdater流式传输数据,或在出现错误时返回waitUntilReady引用的伪代码来获取新流。func main() {
go func() {
if err := Run(ctx); err != nil {
log.Errorf("connection error: %v", err)
}
ctxCancel()
}()
// some wait logic
}
func Run(ctx context.Context) {
backoffConfig := backoff.Config{
BaseDelay: time.Duration(1 * time.Second),
Multiplier: backoff.DefaultConfig.Multiplier,
Jitter: backoff.DefaultConfig.Jitter,
MaxDelay: time.Duration(120 * time.Second),
}
myConn, err := grpc.DialContext(ctx,
"/var/run/foo.bar",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoffConfig, MinConnectTimeout: time.Duration(1 * time.Second)}),
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
d := net.Dialer{}
c, err := d.DialContext(ctx, "unix", addr)
if err != nil {
return nil, fmt.Errorf("connection to unix://%s failed: %w", addr, err)
}
return c, nil
}),
)
if err != nil {
return fmt.Errorf("could not establish socket for foo: %w", err)
}
defer myConn.Close()
return goroutineOne()
}
func goroutineOne() {
reconnect := make(chan struct{})
for {
if ready := waitUntilReady(ctx, myConn, time.Duration(2*time.Minute)); !ready {
return fmt.Errorf("myConn: %w, timeout: %s", ErrWaitReadyTimeout, "2m")
}
go func() {
if err := fooUpdater(ctx, dataBuffer, myConn); err != nil {
log.Errorf("foo updater: %v", err)
}
reconnect <- struct{}{}
}()
select {
case <-ctx.Done():
return nil
case <-reconnect:
}
}
}
func fooUpdater(ctx context.Context, dataBuffer custom.CircularBuffer, myConn *grpc.ClientConn) error {
clientStream, err := myConn.Stream(ctx) // custom pb code, returns grpc.ClientConn.NewStream(...)
if err != nil {
return fmt.Errorf("could not obtain stream: %w", err)
}
for {
select {
case <-ctx.Done():
return nil
case data := <-dataBuffer:
if err := clientStream.Send(data); err != nil {
return fmt.Errorf("could not send data: %w", err)
}
}
}
}
func waitUntilReady(ctx context.Context, conn *grpc.ClientConn, maxTimeout time.Duration) bool {
ctx, cancel := context.WithTimeout(ctx, maxTimeout)
defer cancel()
currentState := conn.GetState()
timeoutValid := true
for currentState != connectivity.Ready && timeoutValid {
timeoutValid = conn.WaitForStateChange(ctx, currentState)
currentState = conn.GetState()
// debug print currentState -> prints IDLE
}
return currentState == connectivity.Ready
}
Run Code Online (Sandbox Code Playgroud)
也欢迎调试提示:)
ctx.Done根据提供的代码和信息,使用方式可能存在问题。
正在ctx.Done()使用fooUpdater和goroutineOne函数。当连接中断时,我相信这ctx.Done()两个函数都会被调用,执行顺序如下:
连接中断,函数ctx.Done中的 casefooUpdater被调用,退出函数。函数中的select语句goroutineOne也执行ctx.Donecase,该函数存在,客户端不会重新连接。
尝试调试它以检查两个 select case 块是否都被执行,但我相信这就是这里的问题。
| 归档时间: |
|
| 查看次数: |
674 次 |
| 最近记录: |