WaitGroup.Wait()的超时

lah*_*her 19 concurrency timeout go

WaitGroup.Wait()分配超时的惯用方法是什么?

我想这样做的原因是为了保护我的"调度员"免于永远等待一个错误的"工人".这导致了一些哲学问题(即一旦有错误的工人,系统如何可靠地继续?),但我认为这个问题超出了范围.

我有一个答案,我会提供.现在我已经把它写下来了,它看起来并不那么糟糕但它仍然感觉比它应该更复杂.我想知道是否有更简单,更惯用的东西,甚至是不使用WaitGroups的替代方法.

助教.

icz*_*cza 40

大多数情况下,您在下面发布的解决方案都是最好的解决方案.一些改进它的技巧:

  • 或者,您可以关闭通道以发信号通知完成而不是在其上发送值,封闭通道上的接收操作始终可以立即进行.
  • 并且最好使用defer语句来表示信号完成,即使函数突然终止也会执行它.
  • 此外,如果只有一个"作业"要等待,您可以完全省略WaitGroup,只需在作业完成时发送值或关闭通道(您在select语句中使用的通道相同).
  • 指定1秒的持续时间非常简单:timeout := time.Second.例如,指定2秒是:timeout := 2 * time.Second.你不需要转换,time.Second已经是类型time.Duration,将它与无类型常量相乘,2也会产生类型的值time.Duration.

我还将创建一个包含此功能的帮助器/实用程序功能.请注意,WaitGroup必须作为指针传递,否则副本将不会被"通知" WaitGroup.Done()调用.就像是:

// waitTimeout waits for the waitgroup for the specified max timeout.
// Returns true if waiting timed out.
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
    c := make(chan struct{})
    go func() {
        defer close(c)
        wg.Wait()
    }()
    select {
    case <-c:
        return false // completed normally
    case <-time.After(timeout):
        return true // timed out
    }
}
Run Code Online (Sandbox Code Playgroud)

使用它:

if waitTimeout(&wg, time.Second) {
    fmt.Println("Timed out waiting for wait group")
} else {
    fmt.Println("Wait group finished")
}
Run Code Online (Sandbox Code Playgroud)

Go Playground尝试一下.

  • 为什么这被接受为答案,因为如果 wg.Wait() 永远无法返回,则存在潜在的 goroutine 泄漏。 (3认同)
  • 如果`wg.Wait()`永不返回,它是否泄漏了goroutine? (2认同)

lah*_*her 7

我是这样的:http : //play.golang.org/p/eWv0fRlLEC

go func() {
    wg.Wait()
    c <- struct{}{}
}()
timeout := time.Duration(1) * time.Second
fmt.Printf("Wait for waitgroup (up to %s)\n", timeout)
select {
case <-c:
    fmt.Printf("Wait group finished\n")
case <-time.After(timeout):
    fmt.Printf("Timed out waiting for wait group\n")
}
fmt.Printf("Free at last\n")
Run Code Online (Sandbox Code Playgroud)

它工作正常,但这是最好的方法吗?


Dmi*_*rov 6

大多数现有答案都表明存在泄漏 goroutine。为WaitGroup.Wait分配超时的惯用方法是使用底层同步/原子包原语。我从 @icza 答案中获取代码并使用该包重写它atomic,并添加了上下文取消,因为这是通知超时的惯用方式。

package main

import (
    "context"
    "fmt"
    "sync/atomic"
    "time"
)

func main() {
    var submitCount int32
    // run this instead of wg.Add(1)
    atomic.AddInt32(&submitCount, 1)

    // run this instead of wg.Done()
    // atomic.AddInt32(&submitCount, -1)

    timeout := time.Second
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()
    fmt.Printf("Wait for waitgroup (up to %s)\n", timeout)

    waitWithCtx(ctx, &submitCount)

    fmt.Println("Free at last")
}

// waitWithCtx returns when passed counter drops to zero
// or when context is cancelled
func waitWithCtx(ctx context.Context, counter *int32) {
    ticker := time.NewTicker(10 * time.Millisecond)
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            if atomic.LoadInt32(counter) == 0 {
                return
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Go Playground 中的相同代码

  • 恕我直言,这实际上是一种反模式,因为它代表主动等待,即轮询 (3认同)