Go中生产者/消费者的最大成语是什么?

Wil*_*ill 14 concurrency go

我想做的是有一组生产者goroutines(其中一些可能会或可能不会完成)和一个消费者例程.问题在于括号中的警告 - 我们不知道将返回答案的总数.

所以我想做的是:

package main

import (
  "fmt"
  "math/rand"
)

func producer(c chan int) {
  // May or may not produce.
  success := rand.Float32() > 0.5
  if success {
    c <- rand.Int()
  }
}

func main() {
  c := make(chan int, 10)
  for i := 0; i < 10; i++ {
    go producer(c, signal)
  }

  // If we include a close, then that's WRONG. Chan will be closed
  // but a producer will try to write to it. Runtime error.
  close(c)

  // If we don't close, then that's WRONG. All goroutines will
  // deadlock, since the range keyword will look for a close.
  for num := range c {
    fmt.Printf("Producer produced: %d\n", num)
  }
  fmt.Println("All done.")
}
Run Code Online (Sandbox Code Playgroud)

所以问题是,如果我关闭它是错误的,如果我不关闭 - 它仍然是错误的(参见代码中的注释).

现在,解决方案将是一个带外信号通道,所有生产者都写道:

package main

import (
  "fmt"
  "math/rand"
)

func producer(c chan int, signal chan bool) {
  success := rand.Float32() > 0.5
  if success {
    c <- rand.Int()
  }
  signal <- true
}

func main() {
  c := make(chan int, 10)
  signal := make(chan bool, 10)
  for i := 0; i < 10; i++ {
    go producer(c, signal)
  }

  // This is basically a 'join'.
  num_done := 0
  for num_done < 10 {
    <- signal
    num_done++
  }
  close(c)

  for num := range c {
    fmt.Printf("Producer produced: %d\n", num)
  }
  fmt.Println("All done.")
}
Run Code Online (Sandbox Code Playgroud)

这完全符合我的要求!但对我来说,这似乎是一口气.我的问题是:是否有任何成语/技巧可以让我以更简单的方式做类似的事情?

我看了一下:http://golang.org/doc/codewalk/sharemem/ 看起来像completechan(在开始时初始化main)在一个范围内使用但从未关闭.我不明白怎么做.

如果有人有任何见解,我将非常感激.干杯!


编辑:fls0815有答案,并且还回答了无关闭通道范围如何工作的问题.

我上面的代码修改为工作(在fls0815之前完成提供代码):

package main

import (
  "fmt"
  "math/rand"
  "sync"
)

var wg_prod sync.WaitGroup
var wg_cons sync.WaitGroup

func producer(c chan int) {
  success := rand.Float32() > 0.5
  if success {
    c <- rand.Int()
  }
  wg_prod.Done()
}

func main() {
  c := make(chan int, 10)
  wg_prod.Add(10)
  for i := 0; i < 10; i++ {
    go producer(c)
  }

  wg_cons.Add(1)
  go func() {
    for num := range c {
      fmt.Printf("Producer produced: %d\n", num)
    }
    wg_cons.Done()
  } ()

  wg_prod.Wait()
  close(c)
  wg_cons.Wait()
  fmt.Println("All done.")
}
Run Code Online (Sandbox Code Playgroud)

fls*_*815 14

只有生产者应关闭渠道.您可以通过在生成器启动后调用range在生成的通道上迭代()的消费者来实现您的目标.在你的主线程中,你等待(见sync.WaitGroup)直到你的消费者/生产者完成他们的工作.生产者完成后,关闭生成的通道,这将迫使您的消费者退出(range当通道关闭且没有剩余缓冲项时将退出).

示例代码:

package main

import (
    "log"
    "sync"
    "time"
    "math/rand"
    "runtime"
)

func consumer() {
    defer consumer_wg.Done()

    for item := range resultingChannel {
        log.Println("Consumed:", item)
    }
}

func producer() {
    defer producer_wg.Done()

    success := rand.Float32() > 0.5
    if success {
        resultingChannel <- rand.Int()
    }
}

var resultingChannel = make(chan int)
var producer_wg sync.WaitGroup
var consumer_wg sync.WaitGroup

func main() {
    rand.Seed(time.Now().Unix())

    for c := 0; c < runtime.NumCPU(); c++ {
        producer_wg.Add(1)  
        go producer()
    }

    for c := 0; c < runtime.NumCPU(); c++ {
        consumer_wg.Add(1)
        go consumer()
    }

    producer_wg.Wait()

    close(resultingChannel)

    consumer_wg.Wait()
}
Run Code Online (Sandbox Code Playgroud)

我将close-statement放入main函数的原因是因为我们有多个生产者.在上面的示例中关闭一个生成器中的通道会导致您遇到的问题(在封闭的通道上写入;原因是可能还有一个生产者仍在生成数据).只有当没有生产者离开时才应关闭渠道(因此我建议仅由生产者关闭渠道).这就是在Go中构建频道的方式.在这里,您可以找到关于结束频道的更多信息.


与sharemem示例相关:AFAICS此示例通过一次又一次地重新排队资源(从pending - > complete - > pending - > complete ...等等)来运行.这就是main-func结束时的迭代.它接收已完成的资源,并使用Resource.Sleep()将它们重新排入待处理状态.当没有完成的资源时,它会等待并阻止新资源完成.因此,不需要关闭通道,因为它们一直在使用.