是否可以取消未完成的 goroutine?

rus*_*yhu 3 concurrency channel go goroutine

考虑一组检查工作,每个检查工作都有独立的逻辑,因此它们似乎很适合并发运行,例如:

type Work struct {
    // ...
}

// This Check could be quite time-consuming
func (w *Work) Check() bool {
    // return succeed or not

    //...
}

func CheckAll(works []*Work) {
    num := len(works)
    results := make(chan bool, num)
    for _, w := range works {
        go func(w *Work) {
            results <- w.Check()
        }(w)
    }

    for i := 0; i < num; i++ {
        if r := <-results; !r {
            ReportFailed()
            break;
        }
    }
}

func ReportFailed() {
    // ...
}
Run Code Online (Sandbox Code Playgroud)

当关注时results,如果逻辑是无论哪一项工作失败,我们都断言所有工作都完全失败,通道中剩余的值是无用的。让剩余未完成的 goroutine 继续运行并将结果发送到通道是没有意义和浪费的,尤其w.Check()是当相当耗时时。理想的效果类似于:

    for _, w := range works {
        if !w.Check() {
            ReportFailed()
            break;
        }
    }
Run Code Online (Sandbox Code Playgroud)

这仅运行必要的检查工作然后中断,但处于顺序非并发场景中。

那么,是否可以取消这些未完成的goroutines,或者发送到channel呢?

icz*_*cza 5

取消(阻塞)发送

您最初的问题询问如何取消发送操作。通道上的发送基本上是“即时”的。如果通道的缓冲区已满并且没有就绪的接收器,则通道上的发送将被阻塞。

可以使用select声明和cancel关闭的通道“取消”此发送,例如:

cancel := make(chan struct{})

select {
case ch <- value:
case <- cancel:
}
Run Code Online (Sandbox Code Playgroud)

使用另一个 goroutine关闭cancel通道close(cancel)将使上面的 select 放弃发送ch(如果它阻塞)。

但如上所述,发送是在“就绪”通道上“即时”进行的,并且发送首先评估要发送的值:

results <- w.Check()
Run Code Online (Sandbox Code Playgroud)

首先必须运行它w.Check(),一旦完成,它的返回值将被发送到results

取消函数调用

所以你真正需要的是取消w.Check()方法调用。为此,惯用的方法是传递一个context.Context可以取消的值,并且w.Check()它本身必须监视并“服从”此取消请求。

请参阅如果上下文被取消则终止函数执行

请注意,您的函数必须明确支持这一点。函数调用或 goroutine 不会隐式终止,请参阅取消 Go 中的阻塞操作

所以你的Check()应该看起来像这样:

// This Check could be quite time-consuming
func (w *Work) Check(ctx context.Context, workDuration time.Duration) bool {
    // Do your thing and monitor the context!

    select {
    case <-ctx.Done():
        return false
    case <-time.After(workDuration): // Simulate work
        return true
    case <-time.After(2500 * time.Millisecond): // Simulate failure after 2.5 sec
        return false
    }
}
Run Code Online (Sandbox Code Playgroud)

可能CheckAll()看起来像这样:

func CheckAll(works []*Work) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    num := len(works)
    results := make(chan bool, num)

    wg := &sync.WaitGroup{}
    for i, w := range works {
        workDuration := time.Second * time.Duration(i)
        wg.Add(1)
        go func(w *Work) {
            defer wg.Done()
            result := w.Check(ctx, workDuration)
            // You may check and return if context is cancelled
            // so result is surely not sent, I omitted it here.
            select {
            case results <- result:
            case <-ctx.Done():
                return
            }
        }(w)
    }

    go func() {
        wg.Wait()
        close(results) // This allows the for range over results to terminate
    }()

    for result := range results {
        fmt.Println("Result:", result)
        if !result {
            cancel()
            break
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

测试它:

CheckAll(make([]*Work, 10))
Run Code Online (Sandbox Code Playgroud)

输出(在Go Playground上尝试):

Result: true
Result: true
Result: true
Result: false
Run Code Online (Sandbox Code Playgroud)

我们true打印了 3 次(工作在 2.5 秒内完成),然后故障模拟开始,返回false,并终止所有其他作业。

请注意,sync.WaitGroup上面示例中的 并不是严格需要的,因为results有一个能够保存所有结果的缓冲区,但总的来说它仍然是一个很好的做法(如果您将来使用较小的缓冲区)。

请参阅相关:如果 go 中的一个 goroutine 发生错误,则关闭多个 goroutine