我正在尝试实现一个简单的逻辑,其中Producer将数据发送到ch
具有永久for
循环的通道,而 Consumer 从该通道读取ch
。
当 Producer 收到通道上的信号时,它会停止生产并退出永远循环quit
。
代码是这样的(另见这个操场)
func main() {
ch := make(chan int)
quit := make(chan bool)
var wg sync.WaitGroup
wg.Add(1)
go produce(ch, quit, &wg)
go consume(ch)
time.Sleep(1 * time.Millisecond)
fmt.Println("CLOSE")
close(quit)
wg.Wait()
}
func produce(ch chan int, quit chan bool, wg *sync.WaitGroup) {
for i := 0; ; i++ {
select {
case <-quit:
close(ch)
fmt.Println("exit")
wg.Done()
return //we exit
default:
ch <- i
fmt.Println("Producer sends", i)
}
}
}
func consume(ch chan int) {
for {
runtime.Gosched() // give the opportunity to the main goroutine to close the "quit" channel
select {
case i, more := <-ch:
if !more {
fmt.Println("exit consumer")
return
}
fmt.Println("Consumer receives", i)
}
}
}
Run Code Online (Sandbox Code Playgroud)
如果我在我的机器(有 4 个内核的 Mac)上运行这段代码,一切正常。如果我在Go Playgroud上尝试相同的代码,它总是超时。我猜这是因为 Go Playground 是一个单核,所以无限循环不会给其他 goroutines 运行的机会,但是我不明白为什么该指令runtime.Gosched()
没有任何效果。
只是为了完成我所看到的图片,如果我GOMAXPROCS=1
在我的 Mac 上设置,该程序仍然可以正常工作并按预期退出。如果我GOMAXPROCS=1
在 Mac 上设置并删除runtime.Gosched()
指令,行为就会变得脆弱:有时程序按预期终止,有时它似乎永远不会退出无限循环。
您创建了一种在实际程序中不应发生的病态情况,因此调度程序未针对处理此问题进行优化。结合操场上的假时间实现,在超时之前你会得到太多的生产者和消费者的循环。
生产者 goroutine 正在尽可能快地创建值,而消费者则始终准备好接收它们。使用 时GOMAPXPROCS=1
,调度程序将所有时间都花在两者之间,然后被迫抢占可用工作来检查主 goroutine,这比操场允许的时间要长。
如果我们为生产者-消费者对添加一些要做的事情,我们就可以限制他们独占调度程序的时间。例如,time.Sleep(time.Microsecond)
向消费者添加 a 将导致 Playground 打印 1000 个值。这也表明了模拟时间在操场上的“准确度”,因为这对于普通硬件来说是不可能的,因为普通硬件需要非零时间来处理每条消息。
虽然这是一个有趣的案例,但这对实际程序影响不大。
一些注意事项,您可以range
通过通道接收所有值,defer wg.Done
如果可能,您应该始终在 goroutine 的开头,您可以在其中发送值,select
case
这允许您在发送未准备好时实际取消 for-select 循环,如果您想要“退出消费者”消息,您WaitGroup
还需要将 发送给消费者。
https://play.golang.org/p/WyPmpY9pFl7
func main() {
ch := make(chan int)
quit := make(chan bool)
var wg sync.WaitGroup
wg.Add(2)
go produce(ch, quit, &wg)
go consume(ch, &wg)
time.Sleep(50 * time.Microsecond)
fmt.Println("CLOSE")
close(quit)
wg.Wait()
}
func produce(ch chan int, quit chan bool, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; ; i++ {
select {
case <-quit:
close(ch)
fmt.Println("exit")
return
case ch <- i:
fmt.Println("Producer sends", i)
}
}
}
func consume(ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
for i := range ch {
fmt.Println("Consumer receives", i)
time.Sleep(time.Microsecond)
}
fmt.Println("exit consumer")
return
}
Run Code Online (Sandbox Code Playgroud)