多个goroutines在一个频道上收听

Ili*_*oly 65 go

我有多个goroutines尝试同时在同一个频道上接收.似乎在频道上开始接收的最后一个goroutine获得了价值.这是语言规范中的某个地方还是未定义的行为?

c := make(chan string)
for i := 0; i < 5; i++ {
    go func(i int) {
        <-c
        c <- fmt.Sprintf("goroutine %d", i)
    }(i)
}
c <- "hi"
fmt.Println(<-c)
Run Code Online (Sandbox Code Playgroud)

输出:

goroutine 4
Run Code Online (Sandbox Code Playgroud)

在操场上的示例

编辑:

我才意识到这比我想象的要复杂得多.消息在所有goroutine周围传递.

c := make(chan string)
for i := 0; i < 5; i++ {
    go func(i int) {
        msg := <-c
        c <- fmt.Sprintf("%s, hi from %d", msg, i)
    }(i)
}
c <- "original"
fmt.Println(<-c)
Run Code Online (Sandbox Code Playgroud)

输出:

original, hi from 0, hi from 1, hi from 2, hi from 3, hi from 4
Run Code Online (Sandbox Code Playgroud)

在操场上的示例

Ric*_*777 60

是的,这很复杂,但是有一些经验法则可以使事情变得更加直截了当.

  • 更喜欢使用正式参数来传递您传递给惯例的通道而不是访问全局范围内的通道.您可以通过这种方式获得更多的编译器检查,以及更好的模块化.
  • 避免在特定的常规例程(包括"主要" 例程)中读取和写入相同的通道.否则,死锁风险要大得多.

这是您的程序的替代版本,应用这两个指南.此案例演示了频道上的许多作家和一位读者:

c := make(chan string)

for i := 1; i <= 5; i++ {
    go func(i int, co chan<- string) {
        for j := 1; j <= 5; j++ {
            co <- fmt.Sprintf("hi from %d.%d", i, j)
        }
    }(i, c)
}

for i := 1; i <= 25; i++ {
    fmt.Println(<-c)
}
Run Code Online (Sandbox Code Playgroud)

http://play.golang.org/p/quQn7xePLw

它创建了写入单个通道的五个例程,每个通道写入五次.主要的例程读取所有二十五条消息 - 您可能会注意到它们出现的顺序通常不是连续的(即并发性很明显).

此示例演示了Go通道的一个功能:可以让多个编写器共享一个通道; Go将自动交错消息.

这同样适用于一个通道上的一个写入器和多个读取器,如此处的第二个示例所示:

c := make(chan int)
var w sync.WaitGroup
w.Add(5)

for i := 1; i <= 5; i++ {
    go func(i int, ci <-chan int) {
        j := 1
        for v := range ci {
            time.Sleep(time.Millisecond)
            fmt.Printf("%d.%d got %d\n", i, j, v)
            j += 1
        }
        w.Done()
    }(i, c)
}

for i := 1; i <= 25; i++ {
    c <- i
}
close(c)
w.Wait()
Run Code Online (Sandbox Code Playgroud)

第二个例子包括施加在主够程等待,否则退出及时并引起其他五个够程要提前终止(由于olov此校正).

在两个示例中,都不需要缓冲.将缓冲视为仅性能增强器通常是一个很好的原则.如果你的程序在没有缓冲区的情况下没有死锁,它也不会缓冲区死锁(但反过来并不总是如此).因此,作为另一个经验法则,在没有缓冲的情况下启动,然后根据需要添加它.


Bre*_*den 17

迟到的回复,但我希望这对未来的其他人有帮助,比如Long Polling,"Global"Button,广播给大家?

Effective Go解释了这个问题:

接收器始终阻塞,直到有数据要接收.

这意味着你不能有超过1个goroutine监听1个频道,并希望所有goroutine都能获得相同的值.

运行此代码示例.

package main

import "fmt"

func main() {
    c := make(chan int)

    for i := 1; i <= 5; i++ {
        go func(i int) {
        for v := range c {
                fmt.Printf("count %d from goroutine #%d\n", v, i)
            }
        }(i)
    }

    for i := 1; i <= 25; i++ {
        c<-i
    }

    close(c)
}
Run Code Online (Sandbox Code Playgroud)

即使有5个goroutines正在收听频道,你也不会多次看到"计数1".这是因为当第一个goroutine阻塞通道时,所有其他goroutine必须排队等候.当通道被解锁时,计数已经被接收并从通道中移除,因此下一个goroutine在线获得下一个计数值.


pet*_*rSO 6

这很复杂.

另外,看看会发生什么GOMAXPROCS = NumCPU+1.例如,

package main

import (
    "fmt"
    "runtime"
)

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU() + 1)
    fmt.Print(runtime.GOMAXPROCS(0))
    c := make(chan string)
    for i := 0; i < 5; i++ {
        go func(i int) {
            msg := <-c
            c <- fmt.Sprintf("%s, hi from %d", msg, i)
        }(i)
    }
    c <- ", original"
    fmt.Println(<-c)
}
Run Code Online (Sandbox Code Playgroud)

输出:

5, original, hi from 0, hi from 4
Run Code Online (Sandbox Code Playgroud)

并且,看看缓冲通道会发生什么.例如,

package main

import "fmt"

func main() {
    c := make(chan string, 5+1)
    for i := 0; i < 5; i++ {
        go func(i int) {
            msg := <-c
            c <- fmt.Sprintf("%s, hi from %d", msg, i)
        }(i)
    }
    c <- "original"
    fmt.Println(<-c)
}
Run Code Online (Sandbox Code Playgroud)

输出:

original
Run Code Online (Sandbox Code Playgroud)

你也应该能够解释这些案例.


Ale*_*fov 6

我研究了现有的解决方案,并创建了简单的广播库https://github.com/grafov/bcast

    group := bcast.NewGroup() // you created the broadcast group
    go bcast.Broadcasting(0) // the group accepts messages and broadcast it to all members

    member := group.Join() // then you join member(s) from other goroutine(s)
    member.Send("test message") // or send messages of any type to the group 

    member1 := group.Join() // then you join member(s) from other goroutine(s)
    val := member1.Recv() // and for example listen for messages
Run Code Online (Sandbox Code Playgroud)

  • 伟大的lib,你在那里!我也找到了https://github.com/asaskevich/EventBus (2认同)