我正在努力使用通道来实现队列.具体来说,我试图使用通道的大小来限制同时goroutine的数量.即,我写了以下代码:
package main
import "fmt"
import "time"
import "math/rand"
func runTask (t string, ch *chan bool) {
start := time.Now()
fmt.Println("starting task", t)
time.Sleep(time.Millisecond * time.Duration(rand.Int31n(1500))) // fake processing time
fmt.Println("done running task", t, "in", time.Since(start))
<- *ch
}
func main() {
numWorkers := 3
files := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
activeWorkers := make(chan bool, numWorkers)
for _, f := range files {
activeWorkers <- true
fmt.Printf("activeWorkers is %d long.\n", len(activeWorkers))
go runTask(f, &activeWorkers)
}
select{}
}
Run Code Online (Sandbox Code Playgroud)
现在,代码崩溃了:
throw: all goroutines are asleep - deadlock!
Run Code Online (Sandbox Code Playgroud)
我的期望是对select的调用会永远阻塞,让goroutines终止而不会出现死锁.
所以我有一个双重的问题:为什么不选择永久阻塞,而不是抛出一个time.Sleep()调用for循环后,我怎么能避免死锁?
干杯,
-mtw
Arlen Cuss已经写了一个很好的答案.我只是想为你的工作队列建议另一种设计.您可以只生成有限数量的工作人员goroutine,而不是限制您的频道可以缓冲的条目数量.像这样的东西:
package main
import "fmt"
import "time"
import "math/rand"
func runTask(t string) string {
start := time.Now()
fmt.Println("starting task", t)
time.Sleep(time.Millisecond * time.Duration(rand.Int31n(1500))) // fake processing time
fmt.Println("done running task", t, "in", time.Since(start))
return t
}
func worker(in chan string, out chan string) {
for t := range in {
out <- runTask(t)
}
}
func main() {
numWorkers := 3
// spawn workers
in, out := make(chan string), make(chan string)
for i := 0; i < numWorkers; i++ {
go worker(in, out)
}
files := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
// schedule tasks
go func() {
for _, f := range files {
in <- f
}
}()
// get results
for _ = range files {
<-out
}
}
Run Code Online (Sandbox Code Playgroud)
如果您只想等到所有任务都已执行,也可以使用sync.WaitGroup,但使用out
通道的优势在于您可以稍后聚合结果.例如,如果每个任务返回该文件中的单词数,则可以使用最终循环来总结所有单个单词计数.