读取子进程的 stdout 和 stderr 的竞争条件

Tom*_*her 5 timeout go race-condition child-process goroutine

在 Go 中,我试图:

  1. 启动一个子进程
  2. 分别从 stdout 和 stderr 读取
  3. 实施整体超时

经过大量谷歌搜索后,我们想出了一些在大多数情况下似乎可以完成这项工作的代码。但似乎存在竞争条件,无法读取某些输出。

该问题似乎只出现在Linux 上,而不是Windows 上。

按照通过谷歌找到的最简单的解决方案,我们尝试创建一个超时的上下文:

context.WithTimeout(context.Background(), 10*time.Second)
Run Code Online (Sandbox Code Playgroud)

虽然这在大多数情况下都有效,但我们也能找到它永远挂起的情况。子进程的某些方面导致了死锁。(与未充分脱离子进程的孙子进程有关,从而导致子进程永远无法完全退出。)

另外,在某些情况下,似乎在error发生超时时返回的 表示超时,但只有进程实际退出后才会传递(从而使超时的整个概念变得毫无用处)。

func GetOutputsWithTimeout(command string, args []string, timeout int) (io.ReadCloser, io.ReadCloser, int, error) {
    start := time.Now()
    procLogger.Tracef("Initializing %s %+v", command, args)
    cmd := exec.Command(command, args...)

    // get pipes to standard output/error
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        return emptyReader(), emptyReader(), -1, fmt.Errorf("cmd.StdoutPipe() error: %+v", err.Error())
    }
    stderr, err := cmd.StderrPipe()
    if err != nil {
        return emptyReader(), emptyReader(), -1, fmt.Errorf("cmd.StderrPipe() error: %+v", err.Error())
    }

    // setup buffers to capture standard output and standard error
    var buf bytes.Buffer
    var ebuf bytes.Buffer

    // create a channel to capture any errors from wait
    done := make(chan error)
    // create a semaphore to indicate when both pipes are closed
    var wg sync.WaitGroup
    wg.Add(2)

    go func() {
        if _, err := buf.ReadFrom(stdout); err != nil {
            procLogger.Debugf("%s: Error Slurping stdout: %+v", command, err)
        }
        wg.Done()
    }()
    go func() {
        if _, err := ebuf.ReadFrom(stderr); err != nil {
            procLogger.Debugf("%s: Error  Slurping stderr: %+v", command, err)
        }
        wg.Done()
    }()

    // start process
    procLogger.Debugf("Starting %s", command)
    if err := cmd.Start(); err != nil {
        procLogger.Errorf("%s: failed to start: %+v", command, err)
        return emptyReader(), emptyReader(), -1, fmt.Errorf("cmd.Start() error: %+v", err.Error())
    }

    go func() {
        procLogger.Debugf("Waiting for %s (%d) to finish", command, cmd.Process.Pid)
        err := cmd.Wait()                                             // this can  be 'forced' by the killing of the process
        procLogger.Tracef("%s finished: errStatus=%+v", command, err) // err could be nil here
        //notify select of completion, and the status
        done <- err
    }()

    // Wait for timeout or completion.
    select {
    // Timed out
    case <-time.After(time.Duration(timeout) * time.Second):
        elapsed := time.Since(start)
        procLogger.Errorf("%s: timeout after %.1f\n", command, elapsed.Seconds())
        if err := TerminateTree(cmd); err != nil {
            return ioutil.NopCloser(&buf), ioutil.NopCloser(&ebuf), -1,
                fmt.Errorf("failed to kill %s, pid=%d: %+v",
                    command, cmd.Process.Pid, err)
        }
        wg.Wait() // this *should* take care of waiting for stdout and stderr to be collected after we killed the process
        return ioutil.NopCloser(&buf), ioutil.NopCloser(&ebuf), -1,
            fmt.Errorf("%s: timeout %d s reached, pid=%d process killed",
                command, timeout, cmd.Process.Pid)
    //Exited normally or with a non-zero exit code
    case err := <-done:
        wg.Wait() // this *should* take care of waiting for stdout and stderr to be collected after the process terminated naturally.
        elapsed := time.Since(start)
        procLogger.Tracef("%s: Done after %.1f\n", command, elapsed.Seconds())
        rc := -1
        // Note that we have to use go1.10 compatible mechanism.
        if err != nil {
            procLogger.Tracef("%s exited with error: %+v", command, err)
            exitErr, ok := err.(*exec.ExitError)
            if ok {
                ws := exitErr.Sys().(syscall.WaitStatus)
                rc = ws.ExitStatus()
            }
            procLogger.Debugf("%s exited with status %d", command, rc)
            return ioutil.NopCloser(&buf), ioutil.NopCloser(&ebuf), rc,
                fmt.Errorf("%s: process done with error: %+v",
                    command, err)
        } else {
            ws := cmd.ProcessState.Sys().(syscall.WaitStatus)
            rc = ws.ExitStatus()
        }
        procLogger.Debugf("%s exited with status %d", command, rc)
        return ioutil.NopCloser(&buf), ioutil.NopCloser(&ebuf), rc, nil
    }
    //NOTREACHED: should not reach this line!
}
Run Code Online (Sandbox Code Playgroud)

大多数情况下,调用GetOutputsWithTimeout("uname",[]string{"-mpi"},10)将返回预期的单行输出。但有时它不会返回任何输出,就好像读取 stdout 的 goroutine 没有足够快地启动来“捕获”所有输出(或提前退出?)“大多数时候”强烈表明存在竞争条件。

有时我们还会看到来自 goroutine 的有关“文件已关闭”的错误(这似乎是在超时条件下发生的,但也会在其他“正常”时间发生)。

我本以为在 the 之前启动 goroutinecmd.Start()可以确保不会丢失任何输出,并且使用 theWaitGroup可以保证它们在读取缓冲区之前完成。

那么我们怎么会缺少输出呢?两个“reader” goroutine 和 之间是否仍然存在竞争条件cmd.Start()?我们应该确保这两个正在使用另一个运行WaitGroup吗?

还是执行上有问题ReadFrom()

请注意,由于与旧操作系统的向后兼容性问题,我们目前使用 go1.10,但 go1.12.4 也会出现相同的效果。

或者我们是否想得太多了,一个简单的实现context.WithTimeout()就可以完成这项工作?

Dir*_*aio 0

但有时它不会返回任何输出,就好像读取 stdout 的 goroutine 没有足够快地启动来“捕获”所有输出

这是不可能的,因为管道不能“丢失”数据。如果进程正在写入 stdout 而 Go 程序尚未读取,则进程将阻塞。

解决该问题的最简单方法是:

  • 启动 goroutine 来收集 stdout、stderr
  • 启动一个计时器来终止进程
  • 启动流程
  • 使用 .Wait() 等待它完成(或被计时器杀死)
  • 如果计时器被触发,则返回超时错误
  • 处理等待错误

func GetOutputsWithTimeout(command string, args []string, timeout int) ([]byte, []byte, int, error) {
    cmd := exec.Command(command, args...)

    // get pipes to standard output/error
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        return nil, nil, -1, fmt.Errorf("cmd.StdoutPipe() error: %+v", err.Error())
    }
    stderr, err := cmd.StderrPipe()
    if err != nil {
        return nil, nil, -1, fmt.Errorf("cmd.StderrPipe() error: %+v", err.Error())
    }

    // setup buffers to capture standard output and standard error
    var stdoutBuf, stderrBuf []byte

    // create 3 goroutines: stdout, stderr, timer.
    // Use a waitgroup to wait.
    var wg sync.WaitGroup
    wg.Add(2)

    go func() {
        var err error
        if stdoutBuf, err = ioutil.ReadAll(stdout); err != nil {
            log.Printf("%s: Error Slurping stdout: %+v", command, err)
        }
        wg.Done()
    }()
    go func() {
        var err error
        if stderrBuf, err = ioutil.ReadAll(stderr); err != nil {
            log.Printf("%s: Error Slurping stderr: %+v", command, err)
        }
        wg.Done()
    }()

    t := time.AfterFunc(time.Duration(timeout)*time.Second, func() {
        cmd.Process.Kill()
    })

    // start process
    if err := cmd.Start(); err != nil {
        t.Stop()
        return nil, nil, -1, fmt.Errorf("cmd.Start() error: %+v", err.Error())
    }

    err = cmd.Wait()
    timedOut := !t.Stop()
    wg.Wait()

    // check if the timer timed out.
    if timedOut {
        return stdoutBuf, stderrBuf, -1,
            fmt.Errorf("%s: timeout %d s reached, pid=%d process killed",
                command, timeout, cmd.Process.Pid)
    }

    if err != nil {
        rc := -1
        if exitErr, ok := err.(*exec.ExitError); ok {
            rc = exitErr.Sys().(syscall.WaitStatus).ExitStatus()
        }
        return stdoutBuf, stderrBuf, rc,
            fmt.Errorf("%s: process done with error: %+v",
                command, err)
    }

    // cmd.Wait docs say that if err == nil, exit code is 0
    return stdoutBuf, stderrBuf, 0, nil
}

Run Code Online (Sandbox Code Playgroud)