正确地将stdin上的数据传递给命令并从golang中的该命令的stdout接收数据

Wre*_* T. 11 go

我有以下程序:

package main

import "bytes"
import "io"
import "log"
import "os"
import "os/exec"
import "time"

func main() {
    runCatFromStdinWorks(populateStdin("aaa\n"))
    runCatFromStdinWorks(populateStdin("bbb\n"))
}

func populateStdin(str string) func(io.WriteCloser) {
    return func(stdin io.WriteCloser) {
        defer stdin.Close()
        io.Copy(stdin, bytes.NewBufferString(str))
    }
}

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
    cmd := exec.Command("cat")
    stdin, err := cmd.StdinPipe()
    if err != nil {
        log.Panic(err)
    }
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Panic(err)
    }
    err = cmd.Start()
    if err != nil {
        log.Panic(err)
    }
    go populate_stdin_func(stdin)
    go func() {
            // Removing the following lines allow some output
            // to be fetched from cat's stdout sometimes
            time.Sleep(5 * time.Second)
            io.Copy(os.Stdout, stdout)
    }()
    err = cmd.Wait()
    if err != nil {
        log.Panic(err)
    }
}
Run Code Online (Sandbox Code Playgroud)

在循环中运行时,我得不到任何结果,如下所示:

$ while true; do go run cat_thingy.go; echo ; done



^C
Run Code Online (Sandbox Code Playgroud)

这个结果来自于在虚拟机中从apt安装gobu-go on a Ubuntu 12.04(go go go1).我无法在Macbook Air上进行复制(转到版本go1.0.3).这似乎是某种竞争条件.事实上,如果我进入睡眠状态(1*time.Second),我从未在代码中以随机睡眠为代价看到问题.

我在代码中做错了什么,或者这是一个错误?如果它是一个错误,它已被修复?

更新:可能的线索

我发现Command.Wait将关闭与cat子进程通信的管道,即使它们仍有未读数据.我真的不确定处理它的正确方法.我想我可以创建一个通道,当写入stdin完成时通知,但我仍然需要知道cat进程是否已经结束以确保没有其他任何东西将被写入其stdout管道.我知道我可以使用cmd.Process.Wait来确定进程何时结束,但是然后调用cmd.Wait是否安全?

更新:越来越近

这是代码的新内容.我相信这可以写到stdin并从stdout读取.我想如果我从stdout处理goroutine替换io.Copy而没有流的话,我可以使它正确地流式传输数据(而不是全部缓冲).

package main

import "bytes"
import "fmt"
import "io"
import "log"
import "os/exec"
import "runtime"

const inputBufferBlockLength = 3*64*(2<<10) // enough to be bigger than 2x the pipe buffer of 64KiB
const numInputBlocks = 6

func main() {
    runtime.GOMAXPROCS(5)
    runCatFromStdin(populateStdin(numInputBlocks))
}

func populateStdin(numInputBlocks int) func(io.WriteCloser, chan bool) {
    return func(stdin io.WriteCloser) {
        defer stdin.Close()
        repeatedByteBases := []string{"a", "b", "c", "d", "e", "f"}
        for i := 0; i < numInputBlocks; i++ {
          repeatedBytes := bytes.NewBufferString(repeatedByteBases[i]).Bytes()
          fmt.Printf("%s\n", repeatedBytes)
          io.Copy(stdin, bytes.NewReader(bytes.Repeat(repeatedBytes, inputBufferBlockLength)))
        }
    }
}

func runCatFromStdin(populate_stdin_func func(io.WriteCloser)) {
    cmd := exec.Command("cat")
    stdin, err := cmd.StdinPipe()
    if err != nil {
        log.Panic(err)
    }
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Panic(err)
    }
    err = cmd.Start()
    if err != nil {
        log.Panic(err)
    }
    go populate_stdin_func(stdin)
    output_done_channel := make(chan bool)
    go func() {
        out_bytes := new(bytes.Buffer)
        io.Copy(out_bytes, stdout)
        fmt.Printf("%s\n", out_bytes)
        fmt.Println(out_bytes.Len())
        fmt.Println(inputBufferBlockLength*numInputBlocks)
        output_done_channel <- true
    }()
    <-output_done_channel
    err = cmd.Wait()
    if err != nil {
        log.Panic(err)
    }
}
Run Code Online (Sandbox Code Playgroud)

Nic*_*ood 5

这是您的第一个代码的一个版本。请注意添加了 sync.WaitGroup 以确保在关闭命令之前完成发送和接收 go 例程。

package main

import (
    "bytes"
    "io"
    "log"
    "os"
    "os/exec"
    "sync"
    "time"
)

func main() {
    runCatFromStdinWorks(populateStdin("aaa\n"))
    runCatFromStdinWorks(populateStdin("bbb\n"))
}

func populateStdin(str string) func(io.WriteCloser) {
    return func(stdin io.WriteCloser) {
        defer stdin.Close()
        io.Copy(stdin, bytes.NewBufferString(str))
    }
}

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
    cmd := exec.Command("cat")
    stdin, err := cmd.StdinPipe()
    if err != nil {
        log.Panic(err)
    }
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Panic(err)
    }
    err = cmd.Start()
    if err != nil {
        log.Panic(err)
    }
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        defer wg.Done()
        populate_stdin_func(stdin)
    }()
    go func() {
        defer wg.Done()
        time.Sleep(5 * time.Second)
        io.Copy(os.Stdout, stdout)
    }()
    wg.Wait()
    err = cmd.Wait()
    if err != nil {
        log.Panic(err)
    }
}
Run Code Online (Sandbox Code Playgroud)

(这只是@peterSO 所说的另一种说法;-)


pet*_*rSO 0

Go 语句

“go”语句在同一地址空间内作为独立的并发控制线程或 goroutine 启动函数或方法调用的执行。

GoStmt =“go”表达式。

该表达式必须是一个调用。函数值和参数在调用 goroutine 中照常计算,但与常规调用不同,程序执行不会等待调用的函数完成。相反,该函数开始在新的 goroutine 中独立执行。当函数终止时,它的 goroutine 也会终止。如果函数有任何返回值,则在函数完成时它们将被丢弃。

将无用的 goroutine 转换为函数调用。

package main

import (
    "bytes"
    "io"
    "log"
    "os"
    "os/exec"
)

func main() {
    runCatFromStdinWorks(populateStdin("aaa\n"))
    runCatFromStdinWorks(populateStdin("bbb\n"))
}

func populateStdin(str string) func(io.WriteCloser) {
    return func(stdin io.WriteCloser) {
        defer stdin.Close()
        io.Copy(stdin, bytes.NewBufferString(str))
    }
}

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
    cmd := exec.Command("cat")
    stdin, err := cmd.StdinPipe()
    if err != nil {
        log.Panic(err)
    }
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Panic(err)
    }
    err = cmd.Start()
    if err != nil {
        log.Panic(err)
    }
    populate_stdin_func(stdin)
    io.Copy(os.Stdout, stdout)
    err = cmd.Wait()
    if err != nil {
        log.Panic(err)
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 您的代码可以工作,因为我的示例中的管道缓冲区永远不会满。将 goroutine 更改为函数调用通常是行不通的。一般情况下,cat进程用来通信的管道都会有一定大小的缓冲区。例如,stdin 管道有一定的缓冲区。一旦缓冲区满了,对管道的写入将被阻塞。在 Linux 上,我认为缓冲区大小为 64KiB。标准输出的 cat 管道上也会存在类似的问题。在主代码中执行阻塞 I/O 意味着这些阻塞调用将阻塞主代码。 (2认同)