ger*_*rad 9 channel go goroutine
我有一个简单的并发用例,它让我疯狂,我无法找到一个优雅的解决方案.任何帮助,将不胜感激.
我想编写一种方法fetchAll
,从远程服务器并行查询未指定数量的资源.如果任何提取失败,我想立即返回第一个错误.
我最初的,天真的实现,泄漏了goroutines:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func fetchAll() error {
wg := sync.WaitGroup{}
errs := make(chan error)
leaks := make(map[int]struct{})
defer fmt.Println("these goroutines leaked:", leaks)
// run all the http requests in parallel
for i := 0; i < 4; i++ {
leaks[i] = struct{}{}
wg.Add(1)
go func(i int) {
defer wg.Done()
defer delete(leaks, i)
// pretend this does an http request and returns an error
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
errs <- fmt.Errorf("goroutine %d's error returned", i)
}(i)
}
// wait until all the fetches are done and close the error
// channel so the loop below terminates
go func() {
wg.Wait()
close(errs)
}()
// return the first error
for err := range errs {
if err != nil {
return err
}
}
return nil
}
func main() {
fmt.Println(fetchAll())
}
Run Code Online (Sandbox Code Playgroud)
游乐场:https://play.golang.org/p/Be93J514R5
我从阅读https://blog.golang.org/pipelines了解到,我可以创建一个信号通道来清理其他线程.或者,我可能会用context
它来完成它.但似乎这样一个简单的用例应该有一个我想念的更简单的解决方案.
jot*_*oth 10
使用错误组使此过程变得更加简单。这将自动等待所有提供的Go例程成功完成,或者在任何一个例程返回错误的情况下取消所有剩余的Go例程(在这种情况下,该错误是返回给调用者的一个气泡)。
package main
import (
"context"
"fmt"
"math/rand"
"time"
"golang.org/x/sync/errgroup"
)
func fetchAll(ctx context.Context) error {
errs, ctx := errgroup.WithContext(ctx)
// run all the http requests in parallel
for i := 0; i < 4; i++ {
errs.Go(func() error {
// pretend this does an http request and returns an error
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return fmt.Errorf("goroutine %d's error returned", i)
})
}
// Wait for completion and return the first error (if any)
return errs.Wait()
}
func main() {
fmt.Println(fetchAll(context.Background()))
}
Run Code Online (Sandbox Code Playgroud)
除了你的一个goroutine之外,其他所有人都被泄露了,因为他们还在等待发送到errs频道 - 你永远不会完成清空它的范围.你也在泄漏goroutine,他的工作就是关闭errs通道,因为waitgroup永远不会完成.
(另外,正如Andy指出的那样,从地图中删除不是线程安全的,因此需要保护互斥锁.)
但是,我认为这里甚至不需要地图,互斥体,等待组,上下文等.我会重写整个事情,只使用基本的通道操作,如下所示:
package main
import (
"fmt"
"math/rand"
"time"
)
func fetchAll() error {
var N = 4
quit := make(chan bool)
errc := make(chan error)
done := make(chan error)
for i := 0; i < N; i++ {
go func(i int) {
// dummy fetch
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
err := error(nil)
if rand.Intn(2) == 0 {
err = fmt.Errorf("goroutine %d's error returned", i)
}
ch := done // we'll send to done if nil error and to errc otherwise
if err != nil {
ch = errc
}
select {
case ch <- err:
return
case <-quit:
return
}
}(i)
}
count := 0
for {
select {
case err := <-errc:
close(quit)
return err
case <-done:
count++
if count == N {
return nil // got all N signals, so there was no error
}
}
}
}
func main() {
rand.Seed(time.Now().UnixNano())
fmt.Println(fetchAll())
}
Run Code Online (Sandbox Code Playgroud)
游乐场链接:https://play.golang.org/p/mxGhSYYkOb
编辑:确实有一个愚蠢的错误,谢谢你的指出.我修改了上面的代码(我认为......).我还为添加的Realism™添加了一些随机性.
另外,我想强调的是,确实有多种方法可以解决这个问题,而我的解决方案只有一种方法.归根结底,它归结为个人品味,但总的来说,您希望努力实现"惯用"代码 - 并为您带来自然且易于理解的风格.
这是使用joth建议的errgroup的更完整示例。它显示处理成功的数据,并在第一个错误时退出。
https://play.golang.org/p/rU1v-Mp2ijo
package main
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"math/rand"
"time"
)
func fetchAll() error {
g, ctx := errgroup.WithContext(context.Background())
results := make(chan int)
for i := 0; i < 4; i++ {
current := i
g.Go(func() error {
// Simulate delay with random errors.
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
if rand.Intn(2) == 0 {
return fmt.Errorf("goroutine %d's error returned", current)
}
// Pass processed data to channel, or receive a context completion.
select {
case results <- current:
return nil
// Close out if another error occurs.
case <-ctx.Done():
return ctx.Err()
}
})
}
// Elegant way to close out the channel when the first error occurs or
// when processing is successful.
go func() {
g.Wait()
close(results)
}()
for result := range results {
fmt.Println("processed", result)
}
// Wait for all fetches to complete.
return g.Wait()
}
func main() {
fmt.Println(fetchAll())
}
Run Code Online (Sandbox Code Playgroud)