66 multithreading go goroutine
TL; TR:请转到最后一部分告诉我你将如何解决这个问题.
我今天早上开始使用Golang来自Python.我想从Go调用一个闭源可执行文件几次,带有一点并发性,使用不同的命令行参数.我生成的代码工作正常,但我想得到你的输入以改进它.由于我处于早期学习阶段,我还将解释我的工作流程.
为简单起见,这里假设这个"外部闭源程序"是zenity一个Linux命令行工具,可以从命令行显示图形消息框.
所以,在Go中,我会这样:
package main
import "os/exec"
func main() {
cmd := exec.Command("zenity", "--info", "--text='Hello World'")
cmd.Run()
}
Run Code Online (Sandbox Code Playgroud)
这应该是正常的.注意,这.Run()是一个功能等同于.Start()后面跟着.Wait().这很棒,但如果我只想执行一次这个程序,整个编程的东西就不值得了.所以,让我们多次这样做.
现在我有了这个工作,我想用自定义命令行参数多次调用我的程序(这里只是i为了简单起见).
package main
import (
"os/exec"
"strconv"
)
func main() {
NumEl := 8 // Number of times the external program is called
for i:=0; i<NumEl; i++ {
cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
cmd.Run()
}
}
Run Code Online (Sandbox Code Playgroud)
好的,我们做到了!但我仍然看不到Go over Python的优势......这段代码实际上是以串行方式执行的.我有一个多核CPU,我想利用它.所以让我们用goroutines添加一些并发性.
让我们重写代码,使事情更容易调用和重用,并添加着名的go关键字:
package main
import (
"os/exec"
"strconv"
)
func main() {
NumEl := 8
for i:=0; i<NumEl; i++ {
go callProg(i) // <--- There!
}
}
func callProg(i int) {
cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
cmd.Run()
}
Run Code Online (Sandbox Code Playgroud)
没有!问题是什么?所有的goroutine都会立即执行.我真的不知道为什么zenity没有被执行但是AFAIK,Go程序在zenity外部程序甚至可以初始化之前退出.这可以通过以下方式得到证实time.Sleep:等待几秒就足以让8个zenity实例自行启动.我不知道这是否可以被认为是一个bug.
更糟糕的是,我真正喜欢调用的真实程序需要一段时间才能执行.如果我在我的4核CPU上并行执行该程序的8个实例,那将浪费一些时间进行大量的上下文切换...我不知道Go Go goroutines有多么简单,但是exec.Command 会在8个不同的线程中启动zenity 8次.更糟糕的是,我希望执行此程序超过100,000次.在goroutines中一次完成所有这些工作根本不会有效.不过,我还是想利用我的4核CPU!
在线资源倾向于建议使用sync.WaitGroup这种工作.这种方法的问题在于你基本上使用批量的goroutine:如果我创建了4个成员的WaitGroup,Go程序将等待所有 4个外部程序完成,然后再调用一批4个程序.这样效率不高:CPU再次被浪费了.
其他一些资源建议使用缓冲通道来完成工作:
package main
import (
"os/exec"
"strconv"
)
func main() {
NumEl := 8 // Number of times the external program is called
NumCore := 4 // Number of available cores
c := make(chan bool, NumCore - 1)
for i:=0; i<NumEl; i++ {
go callProg(i, c)
c <- true // At the NumCoreth iteration, c is blocking
}
}
func callProg(i int, c chan bool) {
defer func () {<- c}()
cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
cmd.Run()
}
Run Code Online (Sandbox Code Playgroud)
这看起来很难看.渠道不是为了这个目的:我正在利用副作用.我喜欢这个概念,defer但我讨厌必须声明一个函数(甚至是lambda)来从我创建的虚拟通道中弹出一个值.哦,当然,使用虚拟通道本身就是丑陋的.
现在我们差不多完成了.我只想考虑另一个副作用:Go程序在所有zenity弹出窗口关闭之前关闭.这是因为当循环完成时(在第8次迭代),没有什么能阻止程序完成.这一次,sync.WaitGroup将是有用的.
package main
import (
"os/exec"
"strconv"
"sync"
)
func main() {
NumEl := 8 // Number of times the external program is called
NumCore := 4 // Number of available cores
c := make(chan bool, NumCore - 1)
wg := new(sync.WaitGroup)
wg.Add(NumEl) // Set the number of goroutines to (0 + NumEl)
for i:=0; i<NumEl; i++ {
go callProg(i, c, wg)
c <- true // At the NumCoreth iteration, c is blocking
}
wg.Wait() // Wait for all the children to die
close(c)
}
func callProg(i int, c chan bool, wg *sync.WaitGroup) {
defer func () {
<- c
wg.Done() // Decrease the number of alive goroutines
}()
cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
cmd.Run()
}
Run Code Online (Sandbox Code Playgroud)
完成.
我不是指线程; Go如何在内部管理goroutines是不相关的.我的意思是限制一次启动的goroutine的数量:exec.Command每次调用时创建一个新线程,所以我应该控制它被调用的时间.
我无法说服自己这样的虚拟通道是可行的.
tux*_*21b 88
我会产生4个工作器goroutine,它们从公共通道读取任务.Goroutines比其他人更快(因为他们的安排不同或碰巧得到简单的任务)将从这个渠道获得比其他任务更多的任务.除此之外,我还会使用sync.WaitGroup等待所有工作人员完成.剩下的部分就是创建任务.您可以在此处查看该方法的示例实现:
package main
import (
"os/exec"
"strconv"
"sync"
)
func main() {
tasks := make(chan *exec.Cmd, 64)
// spawn four worker goroutines
var wg sync.WaitGroup
for i := 0; i < 4; i++ {
wg.Add(1)
go func() {
for cmd := range tasks {
cmd.Run()
}
wg.Done()
}()
}
// generate some tasks
for i := 0; i < 10; i++ {
tasks <- exec.Command("zenity", "--info", "--text='Hello from iteration n."+strconv.Itoa(i)+"'")
}
close(tasks)
// wait for the workers to finish
wg.Wait()
}
Run Code Online (Sandbox Code Playgroud)
可能有其他可能的方法,但我认为这是一个非常简洁的解决方案,易于理解.
zzz*_*zzz 34
一种简单的限制方法(执行f()N次但最多maxConcurrency同时执行),只是一个方案:
package main
import (
"sync"
)
const maxConcurrency = 4 // for example
var throttle = make(chan int, maxConcurrency)
func main() {
const N = 100 // for example
var wg sync.WaitGroup
for i := 0; i < N; i++ {
throttle <- 1 // whatever number
wg.Add(1)
go f(i, &wg, throttle)
}
wg.Wait()
}
func f(i int, wg *sync.WaitGroup, throttle chan int) {
defer wg.Done()
// whatever processing
println(i)
<-throttle
}
Run Code Online (Sandbox Code Playgroud)
我不太可能称该throttle频道为"虚拟".恕我直言,这是一种优雅的方式(当然不是我的发明),如何限制并发.
顺便说一句:请注意你忽略了返回的错误cmd.Run().
| 归档时间: |
|
| 查看次数: |
28169 次 |
| 最近记录: |