惯用的goroutine终止和错误处理

ger*_*rad 9 channel go goroutine

我有一个简单的并发用例,它让我疯狂,我无法找到一个优雅的解决方案.任何帮助,将不胜感激.

我想编写一种方法fetchAll,从远程服务器并行查询未指定数量的资源.如果任何提取失败,我想立即返回第一个错误.

我最初的,天真的实现,泄漏了goroutines:

package main

import (
  "fmt"
  "math/rand"
  "sync"
  "time"
)

func fetchAll() error {
  wg := sync.WaitGroup{}
  errs := make(chan error)
  leaks := make(map[int]struct{})
  defer fmt.Println("these goroutines leaked:", leaks)

  // run all the http requests in parallel
  for i := 0; i < 4; i++ {
    leaks[i] = struct{}{}
    wg.Add(1)
    go func(i int) {
      defer wg.Done()
      defer delete(leaks, i)

      // pretend this does an http request and returns an error
      time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
      errs <- fmt.Errorf("goroutine %d's error returned", i)
    }(i)
  }

  // wait until all the fetches are done and close the error
  // channel so the loop below terminates
  go func() {
    wg.Wait()
    close(errs)
  }()

  // return the first error
  for err := range errs {
    if err != nil {
      return err
    }
  }

  return nil
}

func main() {
  fmt.Println(fetchAll())
}
Run Code Online (Sandbox Code Playgroud)

游乐场:https://play.golang.org/p/Be93J514R5

我从阅读https://blog.golang.org/pipelines了解到,我可以创建一个信号通道来清理其他线程.或者,我可能会用context它来完成它.但似乎这样一个简单的用例应该有一个我想念的更简单的解决方案.

jot*_*oth 10

使用错误组使此过程变得更加简单。这将自动等待所有提供的Go例程成功完成,或者在任何一个例程返回错误的情况下取消所有剩余的Go例程(在这种情况下,该错误是返回给调用者的一个气泡)。

package main

import (
        "context"
        "fmt"
        "math/rand"
        "time"

        "golang.org/x/sync/errgroup"
)

func fetchAll(ctx context.Context) error {
        errs, ctx := errgroup.WithContext(ctx)

        // run all the http requests in parallel
        for i := 0; i < 4; i++ {
                errs.Go(func() error {
                        // pretend this does an http request and returns an error                                                  
                        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)                                               
                        return fmt.Errorf("goroutine %d's error returned", i)                                                      
                })
        }

        // Wait for completion and return the first error (if any)                                                                 
        return errs.Wait()
}

func main() {
        fmt.Println(fetchAll(context.Background()))
}
Run Code Online (Sandbox Code Playgroud)

  • 但是我们无法将参数传递给 errs.Go (5认同)
  • +1,这太棒了。如此干净和惯用。不过,有一个小补充 - “剩余的”goroutines 没有受到 errgroup 的影响。Errgroup 通常只是等待它们完成。对于快速失败,每个 goroutine 应该具有相同的 `ctx` 并积极关注 `ctx.Done`。换句话说,Go 术语中的动词“取消”并不意味着“杀死 goroutine”。 (4认同)
  • 我喜欢这个,但有一件事引起了我的注意。如果您想将变量传递给 Go 例程函数怎么办?errs.Go 期望函数不带参数。您需要遵循关闭规则 https://golang.org/doc/faq#closures_and_goroutines (4认同)

Lem*_*eId 9

除了你的一个goroutine之外,其他所有人都被泄露了,因为他们还在等待发送到errs频道 - 你永远不会完成清空它的范围.你也在泄漏goroutine,他的工作就是关闭errs通道,因为waitgroup永远不会完成.

(另外,正如Andy指出的那样,从地图中删除不是线程安全的,因此需要保护互斥锁.)

但是,我认为这里甚至不需要地图,互斥体,等待组,上下文等.我会重写整个事情,只使用基本的通道操作,如下所示:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func fetchAll() error {
    var N = 4
    quit := make(chan bool)
    errc := make(chan error)
    done := make(chan error)
    for i := 0; i < N; i++ {
        go func(i int) {
            // dummy fetch
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            err := error(nil)
            if rand.Intn(2) == 0 {
                err = fmt.Errorf("goroutine %d's error returned", i)
            }
            ch := done // we'll send to done if nil error and to errc otherwise
            if err != nil {
                ch = errc
            }
            select {
            case ch <- err:
                return
            case <-quit:
                return
            }
        }(i)
    }
    count := 0
    for {
        select {
        case err := <-errc:
            close(quit)
            return err
        case <-done:
            count++
            if count == N {
                return nil // got all N signals, so there was no error
            }
        }
    }
}

func main() {
    rand.Seed(time.Now().UnixNano())
    fmt.Println(fetchAll())
}
Run Code Online (Sandbox Code Playgroud)

游乐场链接:https://play.golang.org/p/mxGhSYYkOb

编辑:确实有一个愚蠢的错误,谢谢你的指出.我修改了上面的代码(我认为......).我还为添加的Realism™添加了一些随机性.

另外,我想强调的是,确实有多种方法可以解决这个问题,而我的解决方案只有一种方法.归根结底,它归结为个人品味,但总的来说,您希望努力实现"惯用"代码 - 并为您带来自然且易于理解的风格.

  • 抓得好,绝对有比赛!我写得很匆忙,就像我提到的那样,没有测试它(你应该总是这样做)。幸运的是,修复错误只会让整个代码变得更简单,在这种情况下,不需要“chan error(nil)”技巧(有时当您想阻止来自 select 语句的发送时,这很有用,因此您不需要'不必编写多个条件选择)。谢谢你指出我的错误:) (2认同)

syv*_*vex 7

这是使用joth建议的errgroup的更完整示例。它显示处理成功的数据,并在第一个错误时退出。

https://play.golang.org/p/rU1v-Mp2ijo

package main

import (
    "context"
    "fmt"
    "golang.org/x/sync/errgroup"
    "math/rand"
    "time"
)

func fetchAll() error {
    g, ctx := errgroup.WithContext(context.Background())
    results := make(chan int)
    for i := 0; i < 4; i++ {
        current := i
        g.Go(func() error {
            // Simulate delay with random errors.
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            if rand.Intn(2) == 0 {
                return fmt.Errorf("goroutine %d's error returned", current)
            }
            // Pass processed data to channel, or receive a context completion.
            select {
            case results <- current:
                return nil
            // Close out if another error occurs.
            case <-ctx.Done():
                return ctx.Err()
            }
        })
    }

    // Elegant way to close out the channel when the first error occurs or
    // when processing is successful.
    go func() {
        g.Wait()
        close(results)
    }()

    for result := range results {
        fmt.Println("processed", result)
    }

    // Wait for all fetches to complete.
    return g.Wait()
}

func main() {
    fmt.Println(fetchAll())
}

Run Code Online (Sandbox Code Playgroud)