如何设置完成工作的最大 goroutine 数量?

cac*_*tus 0 go goroutine

var wg sync.WaitGroup
wg.Add(len(work))

sem := make(chan struct{}, 10)

wgDone := make(chan bool)

for i < len(work)-1 {
    go func() {
        defer wg.Done()
        sem <- struct{}{}
        defer func() {
            <-sem
        }()
        worker(work[i])
    }()
    i = i + 1
}

go func() {     
    wg.Wait()
    close(wgDone)
}()
Run Code Online (Sandbox Code Playgroud)

我一次只需要10 个新的 goroutine来执行这项工作。这是我当前的解决方案,它阻止 goroutine 继续运行,因此一次只有 10 个。我怎样才能改变这个,这样它就不会创建大量被阻塞等待工作的 goroutine,而是只创建 10 个完成所有工作的 goroutine?

was*_*mup 12

根据用例,以下方法之一很有用:

  1. 使用max新 goroutine 的数量和一个通道作为队列(Go 游乐场):
package main

import (
    "fmt"
    "sync"
)

func main() {
    const max = 10
    queue := make(chan int, max)
    wg := &sync.WaitGroup{}
    for i := 0; i < max; i++ {
        wg.Add(1)
        go worker(wg, queue)
    }
    for i := 0; i < 100; i++ {
        queue <- i
    }
    close(queue)
    wg.Wait()
    fmt.Println("Done")
}
func worker(wg *sync.WaitGroup, queue chan int) {
    defer wg.Done()
    for job := range queue {
        fmt.Print(job, " ") // a job
    }
}
Run Code Online (Sandbox Code Playgroud)
  1. 使用缓冲通道作为信号量将新的 goroutine 数量限制最大数量(Go Playground):
package main

import (
    "fmt"
    "sync"
)

func main() {
    const max = 10
    semaphore := make(chan struct{}, max)
    wg := &sync.WaitGroup{}
    for i := 0; i < 1000; i++ {
        semaphore <- struct{}{} // acquire
        wg.Add(1)
        go limited(i, wg, semaphore)
    }
    wg.Wait()
    fmt.Println("Done")
}
func limited(i int, wg *sync.WaitGroup, semaphore chan struct{}) {
    defer wg.Done()
    fmt.Println("i =", i) // a job
    <-semaphore // release
}

Run Code Online (Sandbox Code Playgroud)
  1. 使用缓冲通道作为信号量将作业数量限制为最大数量 - 这里 goroutine 的数量超过最大数量(Go 游乐场):
package main

import (
    "fmt"
    "sync"
)

func main() {
    const max = 10
    semaphore := make(chan struct{}, max)
    wg := &sync.WaitGroup{}
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go limited(i, wg, semaphore)
    }
    wg.Wait()
    fmt.Println("Done")
}
func limited(i int, wg *sync.WaitGroup, semaphore chan struct{}) {
    defer wg.Done()
    semaphore <- struct{}{} // acquire
    fmt.Println("i =", i) // a job
    <-semaphore // release
}
Run Code Online (Sandbox Code Playgroud)