EOF 后自动 gRPC unix 重新连接

gta*_*atr 5 go grpc-go

我有一个应用程序(我们称之为客户端)通过 gRPC 连接到同一台计算机上的另一个进程(我们称之为服务器)。通信通过 unix 套接字进行。

如果服务器重新启动,我的客户端会收到一个EOF连接,并且不会重新建立连接,尽管我希望它clientConn能够自动处理重新连接。

为什么拨号器不处理重新连接?我希望它能通过我传递的退避参数来实现这一点。

下面是一些伪MWE。

  • Run建立初始连接,然后生成goroutineOne
  • goroutineOne等待连接准备好并委托sendfooUpdater
  • fooUpdater流式传输数据,或在出现错误时返回
  • 因为我使用了此答案waitUntilReady引用的伪代码来获取新流。
func main() {
    go func() {
        if err := Run(ctx); err != nil {
            log.Errorf("connection error: %v", err)
        }
        ctxCancel()
    }()
// some wait logic
}


func Run(ctx context.Context) {
    backoffConfig := backoff.Config{
        BaseDelay:  time.Duration(1 * time.Second),
        Multiplier: backoff.DefaultConfig.Multiplier,
        Jitter:     backoff.DefaultConfig.Jitter,
        MaxDelay:    time.Duration(120 * time.Second),
    }

    myConn, err := grpc.DialContext(ctx,
        "/var/run/foo.bar",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoffConfig, MinConnectTimeout: time.Duration(1 * time.Second)}),
        grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
            d := net.Dialer{}
            c, err := d.DialContext(ctx, "unix", addr)
            if err != nil {
                return nil, fmt.Errorf("connection to unix://%s failed: %w", addr, err)
            }
            return c, nil
        }),
    )
    if err != nil {
        return fmt.Errorf("could not establish socket for foo: %w", err)
    }
    defer myConn.Close()
    return goroutineOne()
}

func goroutineOne() {
    reconnect := make(chan struct{})

    for {
        if ready := waitUntilReady(ctx, myConn, time.Duration(2*time.Minute)); !ready {
            return fmt.Errorf("myConn: %w, timeout: %s", ErrWaitReadyTimeout, "2m")
        }
        go func() {
            if err := fooUpdater(ctx, dataBuffer, myConn); err != nil {
                log.Errorf("foo updater: %v", err)
            }
            reconnect <- struct{}{}
        }()

        select {
        case <-ctx.Done():
            return nil
        case <-reconnect:
        }
    }
}

func fooUpdater(ctx context.Context, dataBuffer custom.CircularBuffer, myConn *grpc.ClientConn) error {
    clientStream, err := myConn.Stream(ctx) // custom pb code, returns grpc.ClientConn.NewStream(...)
    if err != nil {
        return fmt.Errorf("could not obtain stream: %w", err)
    }
    for {
        select {
        case <-ctx.Done():
            return nil
        case data := <-dataBuffer:
            if err := clientStream.Send(data); err != nil {
                return fmt.Errorf("could not send data: %w", err)
            }
        }
    }
}

func waitUntilReady(ctx context.Context, conn *grpc.ClientConn, maxTimeout time.Duration) bool {
    ctx, cancel := context.WithTimeout(ctx, maxTimeout)
    defer cancel()

    currentState := conn.GetState()
    timeoutValid := true

    for currentState != connectivity.Ready && timeoutValid {
        timeoutValid = conn.WaitForStateChange(ctx, currentState)
        currentState = conn.GetState()
        // debug print currentState -> prints IDLE
    }

    return currentState == connectivity.Ready
}
Run Code Online (Sandbox Code Playgroud)

也欢迎调试提示:)

Emi*_*vic 0

ctx.Done根据提供的代码和信息,使用方式可能存在问题。

正在ctx.Done()使用fooUpdatergoroutineOne函数。当连接中断时,我相信这ctx.Done()两个函数都会被调用,执行顺序如下:

连接中断,函数ctx.Done中的 casefooUpdater被调用,退出函数。函数中的select语句goroutineOne也执行ctx.Donecase,该函数存在,客户端不会重新连接。

尝试调试它以检查两个 select case 块是否都被执行,但我相信这就是这里的问题。