lah*_*her 19 concurrency timeout go
为WaitGroup.Wait()分配超时的惯用方法是什么?
我想这样做的原因是为了保护我的"调度员"免于永远等待一个错误的"工人".这导致了一些哲学问题(即一旦有错误的工人,系统如何可靠地继续?),但我认为这个问题超出了范围.
我有一个答案,我会提供.现在我已经把它写下来了,它看起来并不那么糟糕但它仍然感觉比它应该更复杂.我想知道是否有更简单,更惯用的东西,甚至是不使用WaitGroups的替代方法.
助教.
icz*_*cza 40
大多数情况下,您在下面发布的解决方案都是最好的解决方案.一些改进它的技巧:
defer语句来表示信号完成,即使函数突然终止也会执行它.WaitGroup,只需在作业完成时发送值或关闭通道(您在select语句中使用的通道相同).timeout := time.Second.例如,指定2秒是:timeout := 2 * time.Second.你不需要转换,time.Second已经是类型time.Duration,将它与无类型常量相乘,2也会产生类型的值time.Duration.我还将创建一个包含此功能的帮助器/实用程序功能.请注意,WaitGroup必须作为指针传递,否则副本将不会被"通知" WaitGroup.Done()调用.就像是:
// waitTimeout waits for the waitgroup for the specified max timeout.
// Returns true if waiting timed out.
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}
Run Code Online (Sandbox Code Playgroud)
使用它:
if waitTimeout(&wg, time.Second) {
fmt.Println("Timed out waiting for wait group")
} else {
fmt.Println("Wait group finished")
}
Run Code Online (Sandbox Code Playgroud)
在Go Playground尝试一下.
我是这样的:http : //play.golang.org/p/eWv0fRlLEC
go func() {
wg.Wait()
c <- struct{}{}
}()
timeout := time.Duration(1) * time.Second
fmt.Printf("Wait for waitgroup (up to %s)\n", timeout)
select {
case <-c:
fmt.Printf("Wait group finished\n")
case <-time.After(timeout):
fmt.Printf("Timed out waiting for wait group\n")
}
fmt.Printf("Free at last\n")
Run Code Online (Sandbox Code Playgroud)
它工作正常,但这是最好的方法吗?
大多数现有答案都表明存在泄漏 goroutine。为WaitGroup.Wait分配超时的惯用方法是使用底层同步/原子包原语。我从 @icza 答案中获取代码并使用该包重写它atomic,并添加了上下文取消,因为这是通知超时的惯用方式。
package main
import (
"context"
"fmt"
"sync/atomic"
"time"
)
func main() {
var submitCount int32
// run this instead of wg.Add(1)
atomic.AddInt32(&submitCount, 1)
// run this instead of wg.Done()
// atomic.AddInt32(&submitCount, -1)
timeout := time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
fmt.Printf("Wait for waitgroup (up to %s)\n", timeout)
waitWithCtx(ctx, &submitCount)
fmt.Println("Free at last")
}
// waitWithCtx returns when passed counter drops to zero
// or when context is cancelled
func waitWithCtx(ctx context.Context, counter *int32) {
ticker := time.NewTicker(10 * time.Millisecond)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if atomic.LoadInt32(counter) == 0 {
return
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
11423 次 |
| 最近记录: |