Sam*_*mpa 9 unix concurrency multithreading elixir go
精简版:
是否有可能在Golang产卵一些外部程序(shell命令)的并行,使得它不会启动一个操作系统线程每外部流程...并仍然能够接收它的输出,当它完成?
更长的版本:
在Elixir中,如果使用端口,则可以生成数千个外部进程,而不会真正增加Erlang虚拟机中的线程数.
例如,下面的代码片段(启动2500个外部sleep进程)仅由Erlang VM下的20个操作系统线程管理:
defmodule Exmultiproc do
for _ <- 1..2500 do
cmd = "sleep 3600"
IO.puts "Starting another process ..."
Port.open({:spawn, cmd}, [:exit_status, :stderr_to_stdout])
end
System.cmd("sleep", ["3600"])
end
Run Code Online (Sandbox Code Playgroud)
(如果设置ulimit -n为高数字,例如10000)
另一方面,Go中的以下代码(它应该执行相同的操作 - 启动2500个外部sleep进程)也会启动2500个操作系统线程.所以它显然会启动一个操作系统线程 per(阻塞?)系统调用(以便不阻塞整个CPU,或类似,如果我理解正确):
package main
import (
"fmt"
"os/exec"
"sync"
)
func main() {
wg := new(sync.WaitGroup)
for i := 0; i < 2500; i++ {
wg.Add(1)
go func(i int) {
fmt.Println("Starting sleep ", i, "...")
cmd := exec.Command("sleep", "3600")
_, err := cmd.Output()
if err != nil {
panic(err)
}
fmt.Println("Finishing sleep ", i, "...")
wg.Done()
}(i)
}
fmt.Println("Waiting for WaitGroup ...")
wg.Wait()
fmt.Println("WaitGroup finished!")
}
Run Code Online (Sandbox Code Playgroud)
因此,我想知道是否有办法编写Go代码,以便它执行与Elixir代码类似的操作,而不是每个外部进程打开一个操作系统线程?
我基本上正在寻找一种方法来管理至少几千个外部长时间运行(最多10天)的进程,这种方式可以在操作系统中对任何虚拟或物理限制造成尽可能少的问题.
(对不起代码中的任何错误,因为我是Elixir的新手,而且是Go的新手.我很想知道我正在做的任何错误.)
编辑:澄清了并行运行长时间运行流程的要求.
我发现如果我们不wait处理,Go运行时将不会启动2500 operating system threads。所以请使用 cmd.Start() 而不是 cmd.Output()。
但似乎不可能stdout通过 golang os 包在不消耗操作系统线程的情况下读取进程。我认为这是因为 os 包不使用非块 io 来读取管道。
下面的底部程序在我的 Linux 上运行良好,尽管它阻止了进程的标准输出,正如 @JimB 在评论中所说,也许是因为我们的输出很小并且适合系统缓冲区。
func main() {
concurrentProcessCount := 50
wtChan := make(chan *result, concurrentProcessCount)
for i := 0; i < concurrentProcessCount; i++ {
go func(i int) {
fmt.Println("Starting process ", i, "...")
cmd := exec.Command("bash", "-c", "for i in 1 2 3 4 5; do echo to sleep $i seconds;sleep $i;echo done;done;")
outPipe,_ := cmd.StdoutPipe()
err := cmd.Start()
if err != nil {
panic(err)
}
<-time.Tick(time.Second)
fmt.Println("Finishing process ", i, "...")
wtChan <- &result{cmd.Process, outPipe}
}(i)
}
fmt.Println("root:",os.Getpid());
waitDone := 0
forLoop:
for{
select{
case r:=<-wtChan:
r.p.Wait()
waitDone++
output := &bytes.Buffer{}
io.Copy(output, r.b)
fmt.Println(waitDone, output.String())
if waitDone == concurrentProcessCount{
break forLoop
}
}
}
}
Run Code Online (Sandbox Code Playgroud)