解决goroutines僵局

Jak*_*ler 2 concurrency channel go goroutine

我一直在努力解决我在Golang并发中遇到的这个简单问题.我一直在寻找所有可能的解决方案,但没有发现我的问题(或者我可能会错过一个).这是我的代码:

package main

import (
    "fmt"
    "time"
)

func producer(ch chan int, d time.Duration, num int) {

    for i:=0; i<num; i++ {
        ch <- i
        time.Sleep(d)
    }
}

func main() {
    ch := make(chan int)

    go producer(ch, 100*time.Millisecond, 2)
    go producer(ch, 200*time.Millisecond, 5)

    for {
        fmt.Println(<-ch)    
    }

    close(ch)
}
Run Code Online (Sandbox Code Playgroud)

它打印错误:

致命错误:所有goroutines都睡着了 - 僵局!

goroutine 1 [chan receive]:main.main()D:/Code/go/src/testconcurrency/main.go:23 + 0xca退出状态2

什么是避免此错误的有效方法?,谢谢.

icz*_*cza 6

你有"短命"的生产者,他们只在通道上发送有限时间的值,并且你有一个无限for循环,它无休止地从通道接收值,没有终止条件,并且通道只关闭在这无尽的循环之后.一旦生产者停止发送价值,就会陷入僵局.

通道必须由生产者关闭,表明不再发送任何值.由于你有多个生产者没有同步(生产者彼此不同步),一般你不知道哪一个会先完成,所以你不能指定一个关闭渠道(一个渠道只能关闭一次) ,看看为什么Go的频道可以关闭两次? ;和关闭未知长度的频道.

你必须"协调"生产者,当所有人完成工作后,协调员应关闭渠道.

消费者应该for range在通道上使用a ,因为for range构造接收来自通道的所有值,在它关闭之前在它上面发送,然后它自动终止.

为了协调,建议使用sync.WaitGroup.无论您是在这种情况下使用全局的还是本地的,都将它传递给生产者,由您自己决定.使用本地将使解决方案更通用,更容易扩展.需要注意的一点是,必须将指针传递给sync.WaitGroup.每当你启动一个新的生成器时,使用增加waitgroup WaitGroup.Add().当一个制作人完成时,它可以使用WaitGroup.Done(),最好是使用defer(因此它无论如何运行,在异常情况下减轻死锁).控制器可以等待所有生产者完成使用WaitGroup.Wait().

这是一个完整的解决方案:

func producer(ch chan int, d time.Duration, num int, wg *sync.WaitGroup) {
    defer wg.Done()

    for i := 0; i < num; i++ {
        ch <- i
        time.Sleep(d)
    }
}

func main() {
    wg := &sync.WaitGroup{}
    ch := make(chan int)

    wg.Add(1)
    go producer(ch, 100*time.Millisecond, 2, wg)
    wg.Add(1)
    go producer(ch, 200*time.Millisecond, 5, wg)

    go func() {
        wg.Wait()
        close(ch)
    }()

    for v := range ch {
        fmt.Println(v)
    }
}
Run Code Online (Sandbox Code Playgroud)

输出(在Go Playground上试试):

0
0
1
1
2
3
4
Run Code Online (Sandbox Code Playgroud)

请参阅相关问题:在Golang中完成goroutines之前,阻止main()函数终止


Grz*_*Żur 5

可以使用两个等待组以优雅的方式解决此问题.通过关闭渠道,ch我们向消费者发出信号,表示没有更多数据.

这些解决方案适用于更多消费者.

package main

import (
    "fmt"
    "sync"
    "time"
)

func producer(ch chan<- int, d time.Duration, num int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i < num; i++ {
        ch <- i
        time.Sleep(d)
    }
}

func consumer(ch <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for x := range ch {
        fmt.Println(x)
    }
}

func main() {
    ch := make(chan int)
    producers := &sync.WaitGroup{}
    consumers := &sync.WaitGroup{}

    producers.Add(2)
    go producer(ch, 100*time.Millisecond, 2, producers)
    go producer(ch, 200*time.Millisecond, 5, producers)

    consumers.Add(1)
    go consumer(ch, consumers)

    producers.Wait()
    close(ch)
    consumers.Wait()
}
Run Code Online (Sandbox Code Playgroud)