你如何定义一个在Golang中一次执行的goroutine池?

66 multithreading go goroutine

TL; TR:请转到最后一部分告诉我你将如何解决这个问题.

我今天早上开始使用Golang来自Python.我想从Go调用一个闭源可执行文件几次,带有一点并发性,使用不同的命令行参数.我生成的代码工作正常,但我想得到你的输入以改进它.由于我处于早期学习阶段,我还将解释我的工作流程.

为简单起见,这里假设这个"外部闭源程序"是zenity一个Linux命令行工具,可以从命令行显示图形消息框.

从Go调用可执行文件

所以,在Go中,我会这样:

package main
import "os/exec"
func main() {
    cmd := exec.Command("zenity", "--info", "--text='Hello World'")
    cmd.Run()
}
Run Code Online (Sandbox Code Playgroud)

这应该是正常的.注意,这.Run()是一个功能等同于.Start()后面跟着.Wait().这很棒,但如果我只想执行一次这个程序,整个编程的东西就不值得了.所以,让我们多次这样做.

多次调用可执行文件

现在我有了这个工作,我想用自定义命令行参数多次调用我的程序(这里只是i为了简单起见).

package main    
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 // Number of times the external program is called
    for i:=0; i<NumEl; i++ {
        cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
        cmd.Run()
    }
}
Run Code Online (Sandbox Code Playgroud)

好的,我们做到了!但我仍然看不到Go over Python的优势......这段代码实际上是以串行方式执行的.我有一个多核CPU,我想利用它.所以让我们用goroutines添加一些并发性.

Goroutines,或者让我的程序并行的方法

a)第一次尝试:到处添加"go"

让我们重写代码,使事情更容易调用和重用,并添加着名的go关键字:

package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 
    for i:=0; i<NumEl; i++ {
        go callProg(i)  // <--- There!
    }
}

func callProg(i int) {
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}
Run Code Online (Sandbox Code Playgroud)

没有!问题是什么?所有的goroutine都会立即执行.我真的不知道为什么zenity没有被执行但是AFAIK,Go程序在zenity外部程序甚至可以初始化之前退出.这可以通过以下方式得到证实time.Sleep:等待几秒就足以让8个zenity实例自行启动.我不知道这是否可以被认为是一个bug.

更糟糕的是,我真正喜欢调用的真实程序需要一段时间才能执行.如果我在我的4核CPU上并行执行该程序的8个实例,那将浪费一些时间进行大量的上下文切换...我不知道Go Go goroutines有多么简单,但是exec.Command 在8个不同的线程中启动zenity 8次.更糟糕的是,我希望执行此程序超过100,000次.在goroutines中一次完成所有这些工作根本不会有效.不过,我还是想利用我的4核CPU!

b)第二次尝试:使用goroutines池

在线资源倾向于建议使用sync.WaitGroup这种工作.这种方法的问题在于你基本上使用批量的goroutine:如果我创建了4个成员的WaitGroup,Go程序将等待所有 4个外部程序完成,然后再调用一批4个程序.这样效率不高:CPU再次被浪费了.

其他一些资源建议使用缓冲通道来完成工作:

package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8               // Number of times the external program is called
    NumCore := 4             // Number of available cores
    c := make(chan bool, NumCore - 1) 
    for i:=0; i<NumEl; i++ {
        go callProg(i, c)
        c <- true            // At the NumCoreth iteration, c is blocking   
    }
}

func callProg(i int, c chan bool) {
    defer func () {<- c}()
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}
Run Code Online (Sandbox Code Playgroud)

这看起来很难看.渠道不是为了这个目的:我正在利用副作用.我喜欢这个概念,defer但我讨厌必须声明一个函数(甚至是lambda)来从我创建的虚拟通道中弹出一个值.哦,当然,使用虚拟通道本身就是丑陋的.

c)第三次尝试:当所有孩子都死了之后死去

现在我们差不多完成了.我只想考虑另一个副作用:Go程序在所有zenity弹出窗口关闭之前关闭.这是因为当循环完成时(在第8次迭代),没有什么能阻止程序完成.这一次,sync.WaitGroup将是有用的.

package main
import (
    "os/exec"
    "strconv"
    "sync"
)

func main() {
    NumEl := 8               // Number of times the external program is called
    NumCore := 4             // Number of available cores
    c := make(chan bool, NumCore - 1) 
    wg := new(sync.WaitGroup)
    wg.Add(NumEl)            // Set the number of goroutines to (0 + NumEl)
    for i:=0; i<NumEl; i++ {
        go callProg(i, c, wg)
        c <- true            // At the NumCoreth iteration, c is blocking   
    }
    wg.Wait() // Wait for all the children to die
    close(c)
}

func callProg(i int, c chan bool, wg *sync.WaitGroup) {
    defer func () {
        <- c
        wg.Done() // Decrease the number of alive goroutines
    }()
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}
Run Code Online (Sandbox Code Playgroud)

完成.

我的问题

  • 你知道任何其他正确的方法来限制一次执行的goroutines的数量吗?

我不是指线程; Go如何在内部管理goroutines是不相关的.我的意思是限制一次启动的goroutine的数量:exec.Command每次调用时创建一个新线程,所以我应该控制它被调用的时间.

  • 那段代码看起来不错吗?
  • 你知道在这种情况下如何避免使用虚拟通道吗?

我无法说服自己这样的虚拟通道是可行的.

tux*_*21b 88

我会产生4个工作器goroutine,它们从公共通道读取任务.Goroutines比其他人更快(因为他们的安排不同或碰巧得到简单的任务)将从这个渠道获得比其他任务更多的任务.除此之外,我还会使用sync.WaitGroup等待所有工作人员完成.剩下的部分就是创建任务.您可以在此处查看该方法的示例实现:

package main

import (
    "os/exec"
    "strconv"
    "sync"
)

func main() {
    tasks := make(chan *exec.Cmd, 64)

    // spawn four worker goroutines
    var wg sync.WaitGroup
    for i := 0; i < 4; i++ {
        wg.Add(1)
        go func() {
            for cmd := range tasks {
                cmd.Run()
            }
            wg.Done()
        }()
    }

    // generate some tasks
    for i := 0; i < 10; i++ {
        tasks <- exec.Command("zenity", "--info", "--text='Hello from iteration n."+strconv.Itoa(i)+"'")
    }
    close(tasks)

    // wait for the workers to finish
    wg.Wait()
}
Run Code Online (Sandbox Code Playgroud)

可能有其他可能的方法,但我认为这是一个非常简洁的解决方案,易于理解.

  • 不,它没有.上面的代码使用单个SPMC(单个生产者/多个使用者)队列将任务分配给不同的工作者.每个命令只能从任务通道接收一次. (4认同)
  • "范围任务"将迭代,直到任务通道没有更多消息? (2认同)

zzz*_*zzz 34

一种简单的限制方法(执行f()N次但最多maxConcurrency同时执行),只是一个方案:

package main

import (
        "sync"
)

const maxConcurrency = 4 // for example

var throttle = make(chan int, maxConcurrency)

func main() {
        const N = 100 // for example
        var wg sync.WaitGroup
        for i := 0; i < N; i++ {
                throttle <- 1 // whatever number
                wg.Add(1)
                go f(i, &wg, throttle)
        }
        wg.Wait()
}

func f(i int, wg *sync.WaitGroup, throttle chan int) {
        defer wg.Done()
        // whatever processing
        println(i)
        <-throttle
}
Run Code Online (Sandbox Code Playgroud)

操场

我不太可能称该throttle频道为"虚拟".恕我直言,这是一种优雅的方式(当然不是我的发明),如何限制并发.

顺便说一句:请注意你忽略了返回的错误cmd.Run().

  • @ tux21b:欢迎您正式证明这是错误的;-)同时,让我试着证明相反:_Before_开始一个新的goroutine必须插入一个"令牌"(`throttle < - 1`)油门通道`.只有_after_`f()`完成其任何处理,才会从同一个通道中删除一个"标记"(`<-throttle`).由于通道具有固定容量`maxConcurrency`,因此排队的行数永远不会超过`maxConcurrency`.因此,不可能有超过`f()`处理数据的`maxConcurrency`并发实例.顺便说一句:上面没有"同步数据". (3认同)