rus*_*yhu 3 concurrency channel go goroutine
考虑一组检查工作,每个检查工作都有独立的逻辑,因此它们似乎很适合并发运行,例如:
type Work struct {
// ...
}
// This Check could be quite time-consuming
func (w *Work) Check() bool {
// return succeed or not
//...
}
func CheckAll(works []*Work) {
num := len(works)
results := make(chan bool, num)
for _, w := range works {
go func(w *Work) {
results <- w.Check()
}(w)
}
for i := 0; i < num; i++ {
if r := <-results; !r {
ReportFailed()
break;
}
}
}
func ReportFailed() {
// ...
}
Run Code Online (Sandbox Code Playgroud)
当关注时results,如果逻辑是无论哪一项工作失败,我们都断言所有工作都完全失败,通道中剩余的值是无用的。让剩余未完成的 goroutine 继续运行并将结果发送到通道是没有意义和浪费的,尤其w.Check()是当相当耗时时。理想的效果类似于:
for _, w := range works {
if !w.Check() {
ReportFailed()
break;
}
}
Run Code Online (Sandbox Code Playgroud)
这仅运行必要的检查工作然后中断,但处于顺序非并发场景中。
那么,是否可以取消这些未完成的goroutines,或者发送到channel呢?
您最初的问题询问如何取消发送操作。通道上的发送基本上是“即时”的。如果通道的缓冲区已满并且没有就绪的接收器,则通道上的发送将被阻塞。
您可以使用select声明和cancel关闭的通道“取消”此发送,例如:
cancel := make(chan struct{})
select {
case ch <- value:
case <- cancel:
}
Run Code Online (Sandbox Code Playgroud)
使用另一个 goroutine关闭cancel通道close(cancel)将使上面的 select 放弃发送ch(如果它阻塞)。
但如上所述,发送是在“就绪”通道上“即时”进行的,并且发送首先评估要发送的值:
results <- w.Check()
Run Code Online (Sandbox Code Playgroud)
首先必须运行它w.Check(),一旦完成,它的返回值将被发送到results。
所以你真正需要的是取消w.Check()方法调用。为此,惯用的方法是传递一个context.Context可以取消的值,并且w.Check()它本身必须监视并“服从”此取消请求。
请注意,您的函数必须明确支持这一点。函数调用或 goroutine 不会隐式终止,请参阅取消 Go 中的阻塞操作。
所以你的Check()应该看起来像这样:
// This Check could be quite time-consuming
func (w *Work) Check(ctx context.Context, workDuration time.Duration) bool {
// Do your thing and monitor the context!
select {
case <-ctx.Done():
return false
case <-time.After(workDuration): // Simulate work
return true
case <-time.After(2500 * time.Millisecond): // Simulate failure after 2.5 sec
return false
}
}
Run Code Online (Sandbox Code Playgroud)
可能CheckAll()看起来像这样:
func CheckAll(works []*Work) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
num := len(works)
results := make(chan bool, num)
wg := &sync.WaitGroup{}
for i, w := range works {
workDuration := time.Second * time.Duration(i)
wg.Add(1)
go func(w *Work) {
defer wg.Done()
result := w.Check(ctx, workDuration)
// You may check and return if context is cancelled
// so result is surely not sent, I omitted it here.
select {
case results <- result:
case <-ctx.Done():
return
}
}(w)
}
go func() {
wg.Wait()
close(results) // This allows the for range over results to terminate
}()
for result := range results {
fmt.Println("Result:", result)
if !result {
cancel()
break
}
}
}
Run Code Online (Sandbox Code Playgroud)
测试它:
CheckAll(make([]*Work, 10))
Run Code Online (Sandbox Code Playgroud)
输出(在Go Playground上尝试):
Result: true
Result: true
Result: true
Result: false
Run Code Online (Sandbox Code Playgroud)
我们true打印了 3 次(工作在 2.5 秒内完成),然后故障模拟开始,返回false,并终止所有其他作业。
请注意,sync.WaitGroup上面示例中的 并不是严格需要的,因为results有一个能够保存所有结果的缓冲区,但总的来说它仍然是一个很好的做法(如果您将来使用较小的缓冲区)。
请参阅相关:如果 go 中的一个 goroutine 发生错误,则关闭多个 goroutine