始终有x个goroutines运行

Ala*_*air 27 go goroutine

我看到很多关于如何使Go等待x个goroutine完成的教程和示例,但我要做的是确保总是有x个数字运行,所以一旦结束就会启动一个新的goroutine .

具体来说,我有几十个'要做的事',它正在处理一些来自MySQL的东西.它的工作原理如下:

db, err := sql.Open("mysql", connection_string)
checkErr(err)
defer db.Close()

rows,err := db.Query(`SELECT id FROM table`)
checkErr(err)
defer rows.Close()

var id uint
for rows.Next() {
    err := rows.Scan(&id)
    checkErr(err)
    go processTheThing(id)
    }
checkErr(err)
rows.Close()
Run Code Online (Sandbox Code Playgroud)

目前,将推出数十万个线程processTheThing().我需要的是启动最多x个数字(我们称之为20个)goroutines.所以它首先为前20行启动20,然后从那时开始,它会在当前goroutine之一完成时为下一个id启动一个新的goroutine.所以在任何时候总有20个跑步.

我确信这是非常简单/标准的,但我似乎无法找到任何教程或示例或如何完成的良好解释.

art*_*yom 38

您可能会发现Go Concurrency Patterns文章很有趣,尤其是Bounded parallelism部分,它解释了您需要的确切模式.

您可以使用空结构的通道作为限制保护来控制并发工作器goroutines的数量:

package main

import "fmt"

func main() {
    maxGoroutines := 10
    guard := make(chan struct{}, maxGoroutines)

    for i := 0; i < 30; i++ {
        guard <- struct{}{} // would block if guard channel is already filled
        go func(n int) {
            worker(n)
            <-guard
        }(i)
    }
}

func worker(i int) { fmt.Println("doing work on", i) }
Run Code Online (Sandbox Code Playgroud)

  • Main 在这些 goroutine 可以打印之前退出。在此处查看更新的 play.golang.org 链接:https://play.golang.org/p/HovNRgp6FxH (4认同)
  • 如果要处理的项目少于“maxGoroutines”,则此方法不起作用。也没有等待整理工人。 (3认同)
  • @Muppet 使用空结构而不是 ie bool 更像是一种约定;在这里使用 bool 是可以的,但读者可能会认为 bool 值(true/false)可能有一些含义,但它们不在这个用例中。空结构也使用更少的内存,有关详细信息,请参见 https://dave.cheney.net/2014/03/25/the-empty-struct (2认同)
  • 这个答案具有误导性,以至于让我怀疑它有这么多的赞成票;main 将在所有 goroutine 有时间完成之前退出;按照 @Naikrovek 的建议合并 `sync.WaitGroup` 似乎更合适 (2认同)

Ala*_*air 16

感谢大家帮助我解决这个问题.但是,我并不觉得有人真的提供了既有效又简单/易懂的东西,尽管你们都帮我理解了这项技术.

我最后做的是作为对我的具体问题的回答,我认为更容易理解和实际,所以我会在这里发布,以防其他人有同样的问题.

不知何故,这看起来很像OneOfOne发布的内容,这很好,因为现在我明白了.但OneOfOne的代码我发现起初很难理解,因为函数的函数传递让人很难理解什么是什么.我认为这种方式更有意义:

package main

import (
"fmt"
"sync"
)

const xthreads = 5 // Total number of threads to use, excluding the main() thread

func doSomething(a int) {
    fmt.Println("My job is",a)
    return
}

func main() {
    var ch = make(chan int, 50) // This number 50 can be anything as long as it's larger than xthreads
    var wg sync.WaitGroup

    // This starts xthreads number of goroutines that wait for something to do
    wg.Add(xthreads)
    for i:=0; i<xthreads; i++ {
        go func() {
            for {
                a, ok := <-ch
                if !ok { // if there is nothing to do and the channel has been closed then end the goroutine
                    wg.Done()
                    return
                }
                doSomething(a) // do the thing
            }
        }()
    }

    // Now the jobs can be added to the channel, which is used as a queue
    for i:=0; i<50; i++ {
        ch <- i // add i to the queue
    }

    close(ch) // This tells the goroutines there's nothing else to do
    wg.Wait() // Wait for the threads to finish
}
Run Code Online (Sandbox Code Playgroud)

  • 将 goroutine 称为“线程”有点误导,我只是将它们称为“goroutines” (4认同)

Grz*_*Żur 14

  1. 创建将数据传递给goroutines的渠道.
  2. 启动20个goroutine,在循环中处理来自通道的数据.
  3. 将数据发送到频道而不是开始新的goroutine.


Emi*_*yan 14

在这里,我认为像这样简单的东西会起作用:

package main

import "fmt"

const MAX = 20

func main() {
    sem := make(chan int, MAX)
    for {
        sem <- 1 // will block if there is MAX ints in sem
        go func() {
            fmt.Println("hello again, world")
            <-sem // removes an int from sem, allowing another to proceed
        }()
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 这是一个很好且简单的解决方案!然而,对于考虑使用它的人来说,我建议使用一个空的结构通道:`make(chan struct{}, MAX)`。这样,读者就很清楚我们实际上并不关心通道上传递的值。作为奖励,空结构在内存中使用 0 个字节。 (8认同)

One*_*One 10

GrzegorzŻur的答案是最有效的方法,但对于新手来说,如果不阅读代码就很难实现,所以这里有一个非常简单的实现:

type idProcessor func(id uint)

func SpawnStuff(limit uint, proc idProcessor) chan<- uint {
    ch := make(chan uint)
    for i := uint(0); i < limit; i++ {
        go func() {
            for {
                id, ok := <-ch
                if !ok {
                    return
                }
                proc(id)
            }
        }()
    }
    return ch
}

func main() {
    runtime.GOMAXPROCS(4)
    var wg sync.WaitGroup //this is just for the demo, otherwise main will return
    fn := func(id uint) {
        fmt.Println(id)
        wg.Done()
    }
    wg.Add(1000)
    ch := SpawnStuff(10, fn)
    for i := uint(0); i < 1000; i++ {
        ch <- i
    }
    close(ch) //should do this to make all the goroutines exit gracefully
    wg.Wait()
}
Run Code Online (Sandbox Code Playgroud)

playground