Jam*_*ell 2 concurrency shutdown channel go goroutine
我希望我的常规工作者(ProcessToDo()在下面的代码中)等待,直到关闭之前处理所有"排队"工作.
工作程序具有"待办事项"通道(缓冲),通过该通道向其发送工作.它有一个"完成"通道告诉它开始关机.文档说如果满足多个选择,通道上的选择将选择"伪随机值"...这意味着在完成所有缓冲工作之前触发关闭(返回).
在下面的代码示例中,我希望打印所有20条消息...
package main
import (
"time"
"fmt"
)
func ProcessToDo(done chan struct{}, todo chan string) {
for {
select {
case work, ok := <-todo:
if !ok {
fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
return
}
fmt.Printf("todo: %q\n", work)
time.Sleep(100 * time.Millisecond)
case _, ok := <-done:
if ok {
fmt.Printf("Shutting down ProcessToDo - done message received!\n")
} else {
fmt.Printf("Shutting down ProcessToDo - done channel closed!\n")
}
close(todo)
return
}
}
}
func main() {
done := make(chan struct{})
todo := make(chan string, 100)
go ProcessToDo(done, todo)
for i := 0; i < 20; i++ {
todo <- fmt.Sprintf("Message %02d", i)
}
fmt.Println("*** all messages queued ***")
time.Sleep(1 * time.Second)
close(done)
time.Sleep(4 * time.Second)
}
Run Code Online (Sandbox Code Playgroud)
done在您的情况下通道完全没必要,因为您可以通过关闭todo通道本身来发出关闭信号.
并使用for range通道进行迭代,直到通道关闭且其缓冲区为空.
你应该有一个done频道,但只有这样goroutine本身才能发出信号表明它完成了工作,所以主要的goroutine可以继续或退出.
这个变体等同于你的变体,更简单,并且不需要time.Sleep()调用等待其他goroutine(无论如何这都是错误的和不确定的).在Go Playground尝试:
func ProcessToDo(done chan struct{}, todo chan string) {
for work := range todo {
fmt.Printf("todo: %q\n", work)
time.Sleep(100 * time.Millisecond)
}
fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
done <- struct{}{} // Signal that we processed all jobs
}
func main() {
done := make(chan struct{})
todo := make(chan string, 100)
go ProcessToDo(done, todo)
for i := 0; i < 20; i++ {
todo <- fmt.Sprintf("Message %02d", i)
}
fmt.Println("*** all messages queued ***")
close(todo)
<-done // Wait until the other goroutine finishes all jobs
}
Run Code Online (Sandbox Code Playgroud)
另请注意,工作人员goroutine应该发出完成信号,defer因此如果以某种意外的方式返回,或者恐慌,主goroutine将不会卡在等待工人.所以它应该像这样开始:
defer func() {
done <- struct{}{} // Signal that we processed all jobs
}()
Run Code Online (Sandbox Code Playgroud)
您还可以使用sync.WaitGroup将主goroutine同步到worker(等待它).实际上,如果您计划使用多个worker goroutine,那么比从done通道中读取多个值更清晰.WaitGroup由于它有一个Done()方法(这是一个函数调用),所以它更简单地表示完成,因此您不需要匿名函数:
defer wg.Done()
Run Code Online (Sandbox Code Playgroud)
有关完整示例,请参阅JimB的anwserWaitGroup.
for range如果你想使用多个worker goroutine,那么使用the 也是惯用的:通道是同步的,因此你不需要任何额外的代码来同步对todo通道或从它接收的作业的访问.如果您关闭todo通道main(),那将正确发出所有工作人员的信号.但是,当然所有排队的工作都将被接收和处理一次.
现在采用WaitGroup 用于使主要goroutine等待工人的变体(JimB的答案):如果你想要超过1个工人goroutine怎么办; 同时处理你的工作(很可能并行)?
您需要添加/更改代码的唯一方法是:要真正启动多个代码:
for i := 0; i < 10; i++ {
wg.Add(1)
go ProcessToDo(todo)
}
Run Code Online (Sandbox Code Playgroud)
在不更改任何其他内容的情况下,您现在拥有一个正确的并发应用程序,它使用10个并发goroutine接收和处理您的作业.我们没有使用任何"丑陋" time.Sleep()(我们使用的只是模拟慢速处理,而不是等待其他goroutines),而且你不需要任何额外的同步.
让通道的使用者关闭它通常是一个坏主意,因为在关闭的通道上发送是一种恐慌。
在这种情况下,如果您不想在所有消息发送之前中断消费者,只需使用for...range循环并在完成后关闭通道。您还需要一个类似 a 的信号WaitGroup来等待 goroutine 完成(而不是使用 time.Sleep)
http://play.golang.org/p/r97vRPsxEb
var wg sync.WaitGroup
func ProcessToDo(todo chan string) {
defer wg.Done()
for work := range todo {
fmt.Printf("todo: %q\n", work)
time.Sleep(100 * time.Millisecond)
}
fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
}
func main() {
todo := make(chan string, 100)
wg.Add(1)
go ProcessToDo(todo)
for i := 0; i < 20; i++ {
todo <- fmt.Sprintf("Message %02d", i)
}
fmt.Println("*** all messages queued ***")
close(todo)
wg.Wait()
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1090 次 |
| 最近记录: |