And*_*dyP 2 channel go goroutine
我试图以这种方式并行读取多个文件,以便每个读取文件的 go 例程将其数据写入该通道,然后有一个 go 例程监听该通道并将数据添加到映射中。这是我的玩法。
下面是剧中的例子:
package main
import (
"fmt"
"sync"
)
func main() {
var myFiles = []string{"file1", "file2", "file3"}
var myMap = make(map[string][]byte)
dataChan := make(chan fileData, len(myFiles))
wg := sync.WaitGroup{}
defer close(dataChan)
// we create a wait group of N
wg.Add(len(myFiles))
for _, file := range myFiles {
// we create N go-routines, one per file, each one will return a struct containing their filename and bytes from
// the file via the dataChan channel
go getBytesFromFile(file, dataChan, &wg)
}
// we wait until the wait group is decremented to zero by each instance of getBytesFromFile() calling waitGroup.Done()
wg.Wait()
for i := 0; i < len(myFiles); i++ {
// we can now read from the data channel N times.
file := <-dataChan
myMap[file.name] = file.bytes
}
fmt.Printf("%+v\n", myMap)
}
type fileData struct {
name string
bytes []byte
}
// how to handle error from this method if reading file got messed up?
func getBytesFromFile(file string, dataChan chan fileData, waitGroup *sync.WaitGroup) {
bytes := openFileAndGetBytes(file)
dataChan <- fileData{name: file, bytes: bytes}
waitGroup.Done()
}
func openFileAndGetBytes(file string) []byte {
return []byte(fmt.Sprintf("these are some bytes for file %s", file))
}
Run Code Online (Sandbox Code Playgroud)
问题陈述
如何使用golang.org/x/sync/errgroup等待并处理来自 goroutine 的错误,或者是否有更好的方法(例如使用信号量)?例如,如果我的任何一个 go 例程无法从文件中读取数据,那么我想取消在任何一个例程返回错误的情况下剩余的所有内容(在这种情况下,该错误是返回给调用者的一个气泡)。对于成功案例,它应该自动等待所有提供的 go 例程成功完成。
如果文件总数为 100,我也不想生成 100 个 go 例程。如果有可能的话,我想控制并行度。
如何使用 golang.org/x/sync/errgroup 等待并处理来自 goroutine 的错误,或者是否有更好的方法(例如使用信号量)?例如[...]我想取消在任何一个例程返回错误的情况下剩余的所有那些(在这种情况下,该错误是返回给调用者的一个气泡)。对于成功案例,它应该自动等待所有提供的 go 例程成功完成。
有很多方法可以跨 goroutine 传达错误状态。 errgroup虽然做了很多繁重的工作,但适合这种情况。否则你最终会实现同样的事情。
要使用errgroup我们需要处理错误(并为您的演示生成一些错误)。此外,为了取消现有的 goroutine,我们将使用来自 的上下文errgroup.NewWithContext。
从 errgroup 参考中,
errgroup 包为处理公共任务的子任务的 goroutine 组提供同步、错误传播和上下文取消。
您的游戏不支持任何错误处理。如果我们不进行任何错误处理,我们就无法收集和取消错误。所以我添加了一些代码来注入错误处理:
func openFileAndGetBytes(file string) (string, error) {
if file == "file2" {
return "", fmt.Errorf("%s cannot be read", file)
}
return fmt.Sprintf("these are some bytes for file %s", file), nil
}
Run Code Online (Sandbox Code Playgroud)
然后该错误也必须从getBytesFromFile以下位置传回:
func getBytesFromFile(file string, dataChan chan fileData) error {
bytes, err := openFileAndGetBytes(file)
if err == nil {
dataChan <- fileData{name: file, bytes: bytes}
}
return err
}
Run Code Online (Sandbox Code Playgroud)
现在我们已经做到了这一点,我们可以将注意力转向如何启动一些 goroutine。
如果文件总数为 100,我也不想生成 100 个 go 例程。如果有可能的话,我想控制并行度。
写得好的话,任务数量、通道大小和工作人员数量通常是独立的值。诀窍是使用通道关闭(在您的例子中是上下文取消)来在 goroutine 之间传达状态。我们需要一个额外的通道来分发文件名,以及一个额外的 goroutine 来收集结果。
为了说明这一点,我的代码使用了 3 个工作线程,并添加了更多文件。我的频道没有缓冲。这使我们能够看到一些文件得到处理,而另一些文件则被中止。如果您缓冲通道,该示例仍然有效,但在处理取消之前更有可能需要处理额外的工作。试验缓冲区大小以及工作线程数和要处理的文件数。
var myFiles = []string{"file1", "file2", "file3", "file4", "file5", "file6"}
fileChan := make(chan string)
dataChan := make(chan fileData)
Run Code Online (Sandbox Code Playgroud)
为了启动工作程序,我们不是为每个文件启动一个工作程序,而是启动我们想要的数字 - 这里是 3。
for i := 0; i < 3; i++ {
worker_num := i
g.Go(func() error {
for file := range fileChan {
if err := getBytesFromFile(file, dataChan); err != nil {
fmt.Println("worker", worker_num, "failed to process", file, ":", err.Error())
return err
} else if err := ctx.Err(); err != nil {
fmt.Println("worker", worker_num, "context error in worker:", err.Error())
return err
}
}
fmt.Println("worker", worker_num, "processed all work on channel")
return nil
})
}
Run Code Online (Sandbox Code Playgroud)
工作人员调用您的getBytesFromFile函数。如果它返回一个错误,我们就返回一个错误。 errgroup在这种情况下将自动取消我们的上下文。但是,操作的确切顺序是不确定的,因此在取消上下文之前可能会或可能不会处理更多文件。我将在下面展示几种可能性。
通过rangeing over fileChan,工作人员会自动从通道关闭中接取工作结束。如果我们收到错误,我们可以errgroup立即将其返回。否则,如果上下文已被取消,我们可以立即返回取消错误。
您可能认为这g.Go会自动取消我们的功能。但它不能。除了进程终止之外,没有其他方法可以取消正在运行的函数Go。 errgroup.Group.Go的函数参数必须根据其上下文的状态在适当的时候取消自身。
现在我们可以将注意力转向将文件放在 上的东西fileChan。我们这里有 2 个选项:我们可以使用大小为 的缓冲通道myFiles,就像您所做的那样。我们可以用待处理的作业填充整个通道。仅当您在创建通道时知道作业数量时,这才是一个选项。另一种选择是使用额外的“分发”goroutine,它可以阻止写入,fileChan以便我们的“主”goroutine 可以继续。
// dispatch files
g.Go(func() error {
defer close(fileChan)
done := ctx.Done()
for _, file := range myFiles {
select {
case fileChan <- file:
continue
case <-done:
break
}
}
return ctx.Err()
})
Run Code Online (Sandbox Code Playgroud)
我不确定在这种情况下是否绝对有必要将其放入同一个 errgroup 中,因为我们无法在分发器 goroutine 中收到错误。但是,无论工作调度程序是否可能生成错误,从errgroup 的 Pipeline 示例中提取的这种通用模式都会起作用。
这个功能非常简单,但神奇之处在于select通道ctx.Done()。要么我们写入工作通道,要么如果我们的上下文完成,我们就会失败。这允许我们在一名工作人员未能完成一个文件时停止分配工作。
我们defer close(fileChan)这样,无论我们为什么完成(要么我们分发了所有工作,要么上下文被取消),工作人员都知道传入工作队列上将不再有工作(即fileChan)。
我们还需要一种同步机制:一旦所有的工作都被分发,并且所有的结果都在或者工作被取消完成(例如,在我们的 errgroupWait()返回之后),我们需要关闭我们的结果通道dataChan。这向结果收集器发出信号:没有更多结果需要收集。
var err error // we'll need this later!
go func() {
err = g.Wait()
close(dataChan)
}()
Run Code Online (Sandbox Code Playgroud)
我们不能 - 也不需要 - 将其放入errgroup.Group. 该函数无法返回错误,并且它不能等待自身返回错误close(dataChan)。所以它进入了一个常规的旧 goroutine,没有errgroup。
最后我们可以收集结果。通过专用的工作协程、分发协程和等待工作并通知不再写入的协程dataChan,我们可以在 的“主”协程中收集所有结果main。
for data := range dataChan {
myMap[data.name] = data.bytes
}
if err != nil { // this was set in our final goroutine, remember
fmt.Println("errgroup Error:", err.Error())
}
Run Code Online (Sandbox Code Playgroud)
我做了一些小更改,以便更容易看到输出。您可能已经注意到我将文件内容从 更改[]byte为string. 这纯粹是为了使结果易于阅读。为此,我使用encoding/json格式化结果,以便轻松阅读它们并将它们粘贴到 SO 中。这是我经常用来缩进结构化数据的常见模式:
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
if err := enc.Encode(myMap); err != nil {
panic(err)
}
Run Code Online (Sandbox Code Playgroud)
最后我们准备好运行了。现在我们可以看到许多不同的结果,具体取决于 goroutine 的执行顺序。但它们都是有效的执行路径。
worker 2 failed to process file2 : file2 cannot be read
worker 0 context error in worker: context canceled
worker 1 context error in worker: context canceled
errgroup Error: file2 cannot be read
{
"file1": "these are some bytes for file file1",
"file3": "these are some bytes for file file3"
}
Program exited.
Run Code Online (Sandbox Code Playgroud)
在此结果中,剩余工作 (file4和file5) 未添加到通道中。请记住,无缓冲通道不存储数据。对于要写入通道的这些任务,必须有工作人员在那里读取它们。相反,上下文在file2失败后被取消,并且分发函数遵循<-done其选择中的路径。 file1并且file3已经被处理了。
这是一个不同的结果(我刚刚运行了游乐场共享几次以获得不同的结果)。
worker 1 failed to process file2 : file2 cannot be read
worker 2 processed all work on channel
worker 0 processed all work on channel
errgroup Error: file2 cannot be read
{
"file1": "these are some bytes for file file1",
"file3": "these are some bytes for file file3",
"file4": "these are some bytes for file file4",
"file5": "these are some bytes for file file5",
"file6": "these are some bytes for file file6"
}
Run Code Online (Sandbox Code Playgroud)
在这种情况下,看起来有点像我们的取消失败了。但真正发生的事情是,goroutines 只是“碰巧”排队并完成了其余的工作,然后才errorgroup发现工人的失败并取消了上下文。
当您使用 errorgroup 时,您实际上可以从中得到两件事:
请记住,errorgroup 不会取消goroutine。一开始这让我有点困惑。Errorgroup 取消上下文。您有责任将该上下文的状态应用于您的 goroutine(请记住,goroutine 必须自行结束,errorgroup不能结束它)。
大多数文件操作(例如io.Copy或os.ReadFile)实际上是后续操作的循环Read。但是io和os不直接支持上下文。因此,如果您有一个工作人员正在读取文件,并且您自己没有实现循环Read,那么您将没有机会根据上下文取消。对于您的情况来说,这可能没问题 - 当然,您可能已经读取了比实际需要更多的文件,但这只是因为错误发生时您已经在读取它们。我个人会接受这种情况并且不会实现我自己的读取循环。
https://go.dev/play/p/9qfESp_eB-C
package main
import (
"context"
"encoding/json"
"fmt"
"os"
"golang.org/x/sync/errgroup"
)
func main() {
var myFiles = []string{"file1", "file2", "file3", "file4", "file5", "file6"}
fileChan := make(chan string)
dataChan := make(chan fileData)
g, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 3; i++ {
worker_num := i
g.Go(func() error {
for file := range fileChan {
if err := getBytesFromFile(file, dataChan); err != nil {
fmt.Println("worker", worker_num, "failed to process", file, ":", err.Error())
return err
} else if err := ctx.Err(); err != nil {
fmt.Println("worker", worker_num, "context error in worker:", err.Error())
return err
}
}
fmt.Println("worker", worker_num, "processed all work on channel")
return nil
})
}
// dispatch files
g.Go(func() error {
defer close(fileChan)
done := ctx.Done()
for _, file := range myFiles {
if err := ctx.Err(); err != nil {
return err
}
select {
case fileChan <- file:
continue
case <-done:
break
}
}
return ctx.Err()
})
var err error
go func() {
err = g.Wait()
close(dataChan)
}()
var myMap = make(map[string]string)
for data := range dataChan {
myMap[data.name] = data.bytes
}
if err != nil {
fmt.Println("errgroup Error:", err.Error())
}
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
if err := enc.Encode(myMap); err != nil {
panic(err)
}
}
type fileData struct {
name,
bytes string
}
func getBytesFromFile(file string, dataChan chan fileData) error {
bytes, err := openFileAndGetBytes(file)
if err == nil {
dataChan <- fileData{name: file, bytes: bytes}
}
return err
}
func openFileAndGetBytes(file string) (string, error) {
if file == "file2" {
return "", fmt.Errorf("%s cannot be read", file)
}
return fmt.Sprintf("these are some bytes for file %s", file), nil
}
Run Code Online (Sandbox Code Playgroud)