Go中的惯用可变大小工作池

Hut*_*ut8 9 concurrency semaphore worker-process go goroutine

我正试图在Go中实现一个工作池.在去维基(和有效的围棋频道部分)功能限制资源利用的很好的例子.只需创建一个具有与工作池一样大的缓冲区的通道.然后用工人填充该频道,并在完成后将其发送回频道.从通道块接收,直到工作人员可用.所以频道和循环是整个实现 - 非常酷!

或者,可以阻止发送到频道,但同样的想法.

我的问题是在工作池运行时改变它的大小.我不相信有办法改变频道的大小.我有一些想法,但大多数看起来都太复杂了. 这个页面实际上使用一个通道和空结构以大致相同的方式实现了一个信号量,但它有同样的问题(这些东西一直在谷歌搜索"golang信号量".

tux*_*21b 20

我会反过来这样做.我没有产生许多goroutine(仍然需要相当大的内存)并使用通道来阻止它们,而是将工作者建模为goroutines并使用通道来分配工作.像这样的东西:

package main

import (
    "fmt"
    "sync"
)

type Task string

func worker(tasks <-chan Task, quit <-chan bool, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        select {
        case task, ok := <-tasks:
            if !ok {
                return
            }
            fmt.Println("processing task", task)
        case <-quit:
            return
        }
    }
}

func main() {
    tasks := make(chan Task, 128)
    quit := make(chan bool)
    var wg sync.WaitGroup

    // spawn 5 workers
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go worker(tasks, quit, &wg)
    }

    // distribute some tasks
    tasks <- Task("foo")
    tasks <- Task("bar")

    // remove two workers
    quit <- true
    quit <- true

    // add three more workers
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go worker(tasks, quit, &wg)
    }

    // distribute more tasks
    for i := 0; i < 20; i++ {
        tasks <- Task(fmt.Sprintf("additional_%d", i+1))
    }

    // end of tasks. the workers should quit afterwards
    close(tasks)
    // use "close(quit)", if you do not want to wait for the remaining tasks

    // wait for all workers to shut down properly
    wg.Wait()
}
Run Code Online (Sandbox Code Playgroud)

使用一些方便的方法创建单独的WorkerPool类型可能是个好主意.此外,type Task string使用一个结构也很常见,该结构还包含一个done用于表示任务已成功执行的通道.

编辑:我玩了很多,并想出了以下内容:http://play.golang.org/p/VlEirPRk8V.它基本上是相同的例子,具有更好的API.