通过错误组处理 goroutine 终止和错误处理?

And*_*dyP 2 channel go goroutine

我试图以这种方式并行读取多个文件,以便每个读取文件的 go 例程将其数据写入该通道,然后有一个 go 例程监听该通道并将数据添加到映射中。这是我的玩法

下面是剧中的例子:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var myFiles = []string{"file1", "file2", "file3"}
    var myMap = make(map[string][]byte)
    dataChan := make(chan fileData, len(myFiles))
    wg := sync.WaitGroup{}
    defer close(dataChan)
    // we create a wait group of N
    wg.Add(len(myFiles))
    for _, file := range myFiles {
        // we create N go-routines, one per file, each one will return a struct containing their filename and bytes from
        // the file via the dataChan channel
        go getBytesFromFile(file, dataChan, &wg)
    }
    // we wait until the wait group is decremented to zero by each instance of getBytesFromFile() calling waitGroup.Done()
    wg.Wait()
    for i := 0; i < len(myFiles); i++ {
        // we can now read from the data channel N times.
        file := <-dataChan
        myMap[file.name] = file.bytes
    }
    fmt.Printf("%+v\n", myMap)
}

type fileData struct {
    name  string
    bytes []byte
}

// how to handle error from this method if reading file got messed up?
func getBytesFromFile(file string, dataChan chan fileData, waitGroup *sync.WaitGroup) {
    bytes := openFileAndGetBytes(file)
    dataChan <- fileData{name: file, bytes: bytes}
    waitGroup.Done()
}

func openFileAndGetBytes(file string) []byte {
    return []byte(fmt.Sprintf("these are some bytes for file %s", file))
}
Run Code Online (Sandbox Code Playgroud)

问题陈述

如何使用golang.org/x/sync/errgroup等待并处理来自 goroutine 的错误,或者是否有更好的方法(例如使用信号量)?例如,如果我的任何一个 go 例程无法从文件中读取数据,那么我想取消在任何一个例程返回错误的情况下剩余的所有内容(在这种情况下,该错误是返回给调用者的一个气泡)。对于成功案例,它应该自动等待所有提供的 go 例程成功完成。

如果文件总数为 100,我也不想生成 100 个 go 例程。如果有可能的话,我想控制并行度。

Dan*_*ell 9

如何使用 golang.org/x/sync/errgroup 等待并处理来自 goroutine 的错误,或者是否有更好的方法(例如使用信号量)?例如[...]我想取消在任何一个例程返回错误的情况下剩余的所有那些(在这种情况下,该错误是返回给调用者的一个气泡)。对于成功案例,它应该自动等待所有提供的 go 例程成功完成。

有很多方法可以跨 goroutine 传达错误状态。 errgroup虽然做了很多繁重的工作,但适合这种情况。否则你最终会实现同样的事情。

要使用errgroup我们需要处理错误(并为您的演示生成一些错误)。此外,为了取消现有的 goroutine,我们将使用来自 的上下文errgroup.NewWithContext

从 errgroup 参考中,

errgroup 包为处理公共任务的子任务的 goroutine 组提供同步、错误传播和上下文取消。

您的游戏不支持任何错误处理。如果我们不进行任何错误处理,我们就无法收集和取消错误。所以我添加了一些代码来注入错误处理:

func openFileAndGetBytes(file string) (string, error) {
    if file == "file2" {
        return "", fmt.Errorf("%s cannot be read", file)
    }
    return fmt.Sprintf("these are some bytes for file %s", file), nil
}
Run Code Online (Sandbox Code Playgroud)

然后该错误也必须从getBytesFromFile以下位置传回:

func getBytesFromFile(file string, dataChan chan fileData) error {
    bytes, err := openFileAndGetBytes(file)
    if err == nil {
        dataChan <- fileData{name: file, bytes: bytes}
    }
    return err
}
Run Code Online (Sandbox Code Playgroud)

现在我们已经做到了这一点,我们可以将注意力转向如何启动一些 goroutine。

如果文件总数为 100,我也不想生成 100 个 go 例程。如果有可能的话,我想控制并行度。

写得好的话,任务数量、通道大小和工作人员数量通常是独立的值。诀窍是使用通道关闭(在您的例子中是上下文取消)来在 goroutine 之间传达状态。我们需要一个额外的通道来分发文件名,以及一个额外的 goroutine 来收集结果。

为了说明这一点,我的代码使用了 3 个工作线程,并添加了更多文件。我的频道没有缓冲。这使我们能够看到一些文件得到处理,而另一些文件则被中止。如果您缓冲通道,该示例仍然有效,但在处理取消之前更有可能需要处理额外的工作。试验缓冲区大小以及工作线程数和要处理的文件数。

    var myFiles = []string{"file1", "file2", "file3", "file4", "file5", "file6"}
    fileChan := make(chan string)
    dataChan := make(chan fileData)
Run Code Online (Sandbox Code Playgroud)

为了启动工作程序,我们不是为每个文件启动一个工作程序,而是启动我们想要的数字 - 这里是 3。

    for i := 0; i < 3; i++ {
        worker_num := i
        g.Go(func() error {
            for file := range fileChan {
                if err := getBytesFromFile(file, dataChan); err != nil {
                    fmt.Println("worker", worker_num, "failed to process", file, ":", err.Error())
                    return err
                } else if err := ctx.Err(); err != nil {
                    fmt.Println("worker", worker_num, "context error in worker:", err.Error())
                    return err
                }
            }
            fmt.Println("worker", worker_num, "processed all work on channel")
            return nil

        })
    }
Run Code Online (Sandbox Code Playgroud)

工作人员调用您的getBytesFromFile函数。如果它返回一个错误,我们就返回一个错误。 errgroup在这种情况下将自动取消我们的上下文。但是,操作的确切顺序是不确定的,因此在取消上下文之前可能会或可能不会处理更多文件。我将在下面展示几种可能性。

通过rangeing over fileChan,工作人员会自动从通道关闭中接取工作结束。如果我们收到错误,我们可以errgroup立即将其返回。否则,如果上下文已被取消,我们可以立即返回取消错误。

您可能认为这g.Go会自动取消我们的功能。但它不能。除了进程终止之外,没有其他方法可以取消正在运行的函数Goerrgroup.Group.Go的函数参数必须根据其上下文的状态在适当的时候取消自身。

现在我们可以将注意力转向将文件放在 上的东西fileChan。我们这里有 2 个选项:我们可以使用大小为 的缓冲通道myFiles,就像您所做的那样。我们可以用待处理的作业填充整个通道。仅当您在创建通道时知道作业数量时,这才是一个选项。另一种选择是使用额外的“分发”goroutine,它可以阻止写入,fileChan以便我们的“主”goroutine 可以继续。

    // dispatch files
    g.Go(func() error {
        defer close(fileChan)
        done := ctx.Done()
        for _, file := range myFiles {
            select {
            case fileChan <- file:
                continue
            case <-done:
                break
            }
        }
        return ctx.Err()
    })
Run Code Online (Sandbox Code Playgroud)

我不确定在这种情况下是否绝对有必要将其放入同一个 errgroup 中,因为我们无法在分发器 goroutine 中收到错误。但是,无论工作调度程序是否可能生成错误,从errgroup 的 Pipeline 示例中提取的这种通用模式都会起作用。

这个功能非常简单,但神奇之处在于select通道ctx.Done()。要么我们写入工作通道,要么如果我们的上下文完成,我们就会失败。这允许我们在一名工作人员未能完成一个文件时停止分配工作。

我们defer close(fileChan)这样,无论我们为什么完成(要么我们分发了所有工作,要么上下文被取消),工作人员都知道传入工作队列上将不再有工作(即fileChan)。

我们还需要一种同步机制:一旦所有的工作都被分发,并且所有的结果都在或者工作被取消完成(例如,在我们的 errgroupWait()返回之后),我们需要关闭我们的结果通道dataChan。这向结果收集器发出信号:没有更多结果需要收集。

    var err error // we'll need this later!
    go func() {
        err = g.Wait()
        close(dataChan)
    }()
Run Code Online (Sandbox Code Playgroud)

我们不能 - 也不需要 - 将其放入errgroup.Group. 该函数无法返回错误,并且它不能等待自身返回错误close(dataChan)。所以它进入了一个常规的旧 goroutine,没有errgroup。

最后我们可以收集结果。通过专用的工作协程、分发协程和等待工作并通知不再写入的协程dataChan,我们可以在 的“主”协程中收集所有结果main

    for data := range dataChan {
        myMap[data.name] = data.bytes
    }
    if err != nil { // this was set in our final goroutine, remember
        fmt.Println("errgroup Error:", err.Error())
    }
Run Code Online (Sandbox Code Playgroud)

我做了一些小更改,以便更容易看到输出。您可能已经注意到我将文件内容从 更改[]bytestring. 这纯粹是为了使结果易于阅读。为此,我使用encoding/json格式化结果,以便轻松阅读它们并将它们粘贴到 SO 中。这是我经常用来缩进结构化数据的常见模式:

    enc := json.NewEncoder(os.Stdout)
    enc.SetIndent("", " ")
    if err := enc.Encode(myMap); err != nil {
        panic(err)
    }
Run Code Online (Sandbox Code Playgroud)

最后我们准备好运行了。现在我们可以看到许多不同的结果,具体取决于 goroutine 的执行顺序。但它们都是有效的执行路径。

worker 2 failed to process file2 : file2 cannot be read
worker 0 context error in worker: context canceled
worker 1 context error in worker: context canceled
errgroup Error: file2 cannot be read
{
 "file1": "these are some bytes for file file1",
 "file3": "these are some bytes for file file3"
}

Program exited.
Run Code Online (Sandbox Code Playgroud)

在此结果中,剩余工作 (file4file5) 未添加到通道中。请记住,无缓冲通道不存储数据。对于要写入通道的这些任务,必须有工作人员在那里读取它们。相反,上下文在file2失败后被取消,并且分发函数遵循<-done其选择中的路径。 file1并且file3已经被处理了。

这是一个不同的结果(我刚刚运行了游乐场共享几次以获得不同的结果)。

worker 1 failed to process file2 : file2 cannot be read
worker 2 processed all work on channel
worker 0 processed all work on channel
errgroup Error: file2 cannot be read
{
 "file1": "these are some bytes for file file1",
 "file3": "these are some bytes for file file3",
 "file4": "these are some bytes for file file4",
 "file5": "these are some bytes for file file5",
 "file6": "these are some bytes for file file6"
}
Run Code Online (Sandbox Code Playgroud)

在这种情况下,看起来有点像我们的取消失败了。但真正发生的事情是,goroutines 只是“碰巧”排队并完成了其余的工作,然后才errorgroup发现工人的失败并取消了上下文。

错误组做什么

当您使用 errorgroup 时,您实际上可以从中得到两件事:

  • 轻松访问您的工作人员返回的第一个错误;
  • 获取错误组将为您取消的上下文

请记住,errorgroup 不会取消goroutine。一开始这让我有点困惑。Errorgroup 取消上下文。您有责任将该上下文的状态应用于您的 goroutine(请记住,goroutine 必须自行结束,errorgroup不能结束它)。

关于文件操作上下文以及未完成的工作的最后一个旁白

大多数文件操作(例如io.Copyos.ReadFile)实际上是后续操作的循环Read。但是ioos不直接支持上下文。因此,如果您有一个工作人员正在读取文件,并且您自己没有实现循环Read,那么您将没有机会根据上下文取消。对于您的情况来说,这可能没问题 - 当然,您可能已经读取了比实际需要更多的文件,但这只是因为错误发生时您已经在读取它们。我个人会接受这种情况并且不会实现我自己的读取循环。

代码

https://go.dev/play/p/9qfESp_eB-C

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "os"

    "golang.org/x/sync/errgroup"
)

func main() {
    var myFiles = []string{"file1", "file2", "file3", "file4", "file5", "file6"}
    fileChan := make(chan string)
    dataChan := make(chan fileData)
    g, ctx := errgroup.WithContext(context.Background())
    for i := 0; i < 3; i++ {
        worker_num := i
        g.Go(func() error {
            for file := range fileChan {
                if err := getBytesFromFile(file, dataChan); err != nil {
                    fmt.Println("worker", worker_num, "failed to process", file, ":", err.Error())
                    return err
                } else if err := ctx.Err(); err != nil {
                    fmt.Println("worker", worker_num, "context error in worker:", err.Error())
                    return err
                }
            }
            fmt.Println("worker", worker_num, "processed all work on channel")
            return nil

        })
    }
    // dispatch files
    g.Go(func() error {
        defer close(fileChan)
        done := ctx.Done()
        for _, file := range myFiles {
            if err := ctx.Err(); err != nil {
                return err
            }
            select {
            case fileChan <- file:
                continue
            case <-done:
                break
            }
        }
        return ctx.Err()
    })
    var err error
    go func() {
        err = g.Wait()
        close(dataChan)
    }()
    var myMap = make(map[string]string)

    for data := range dataChan {
        myMap[data.name] = data.bytes
    }
    if err != nil {
        fmt.Println("errgroup Error:", err.Error())
    }
    enc := json.NewEncoder(os.Stdout)
    enc.SetIndent("", " ")
    if err := enc.Encode(myMap); err != nil {
        panic(err)
    }
}

type fileData struct {
    name,
    bytes string
}

func getBytesFromFile(file string, dataChan chan fileData) error {
    bytes, err := openFileAndGetBytes(file)
    if err == nil {
        dataChan <- fileData{name: file, bytes: bytes}
    }
    return err
}

func openFileAndGetBytes(file string) (string, error) {
    if file == "file2" {
        return "", fmt.Errorf("%s cannot be read", file)
    }
    return fmt.Sprintf("these are some bytes for file %s", file), nil
}
Run Code Online (Sandbox Code Playgroud)

  • @AndyP,创建一个包含文件名和索引字段的结构类型。发送此结构类型的值而不是 fileChan 上的文件名。 (2认同)