缓冲区为空后关闭"worker"进入例行程序

Jam*_*ell 2 concurrency shutdown channel go goroutine

我希望我的常规工作者(ProcessToDo()在下面的代码中)等待,直到关闭之前处理所有"排队"工作.

工作程序具有"待办事项"通道(缓冲),通过该通道向其发送工作.它有一个"完成"通道告诉它开始关机.文档说如果满足多个选择,通道上的选择将选择"伪随机值"...这意味着在完成所有缓冲工作之前触发关闭(返回).

在下面的代码示例中,我希望打印所有20条消息...

package main

import (
    "time"
    "fmt"
)


func ProcessToDo(done chan struct{}, todo chan string) {
    for {
        select {
        case work, ok := <-todo:
            if !ok {
                fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
                return
            }
            fmt.Printf("todo: %q\n", work)
            time.Sleep(100 * time.Millisecond)
        case _, ok := <-done:
            if ok {
                fmt.Printf("Shutting down ProcessToDo - done message received!\n")
            } else {
                fmt.Printf("Shutting down ProcessToDo - done channel closed!\n")
            }
            close(todo)
            return
        }
    }
}

func main() {

    done := make(chan struct{})
    todo := make(chan string, 100)

    go ProcessToDo(done, todo)

    for i := 0; i < 20; i++ {
        todo <- fmt.Sprintf("Message %02d", i)
    }

    fmt.Println("*** all messages queued ***")
    time.Sleep(1 * time.Second)
    close(done)
    time.Sleep(4 * time.Second)
}
Run Code Online (Sandbox Code Playgroud)

icz*_*cza 8

done在您的情况下通道完全没必要,因为您可以通过关闭todo通道本身来发出关闭信号.

并使用for range通道进行迭代,直到通道关闭且其缓冲区为空.

你应该有一个done频道,但只有这样goroutine本身才能发出信号表明它完成了工作,所以主要的goroutine可以继续或退出.

这个变体等同于你的变体,更简单,并且不需要time.Sleep()调用等待其他goroutine(无论如何这都是错误的和不确定的).在Go Playground尝试:

func ProcessToDo(done chan struct{}, todo chan string) {
    for work := range todo {
        fmt.Printf("todo: %q\n", work)
        time.Sleep(100 * time.Millisecond)
    }
    fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
    done <- struct{}{} // Signal that we processed all jobs
}

func main() {
    done := make(chan struct{})
    todo := make(chan string, 100)

    go ProcessToDo(done, todo)

    for i := 0; i < 20; i++ {
        todo <- fmt.Sprintf("Message %02d", i)
    }

    fmt.Println("*** all messages queued ***")
    close(todo)
    <-done // Wait until the other goroutine finishes all jobs
}
Run Code Online (Sandbox Code Playgroud)

另请注意,工作人员goroutine应该发出完成信号,defer因此如果以某种意外的方式返回,或者恐慌,主goroutine将不会卡在等待工人.所以它应该像这样开始:

defer func() {
    done <- struct{}{} // Signal that we processed all jobs
}()
Run Code Online (Sandbox Code Playgroud)

您还可以使用sync.WaitGroup将主goroutine同步到worker(等待它).实际上,如果您计划使用多个worker goroutine,那么比从done通道中读取多个值更清晰.WaitGroup由于它有一个Done()方法(这是一个函数调用),所以它更简单地表示完成,因此您不需要匿名函数:

defer wg.Done()
Run Code Online (Sandbox Code Playgroud)

有关完整示例,请参阅JimB的anwserWaitGroup.

for range如果你想使用多个worker goroutine,那么使用the 也是惯用的:通道是同步的,因此你不需要任何额外的代码来同步对todo通道或从它接收的作业的访问.如果您关闭todo通道main(),那将正确发出所有工作人员的信号.但是,当然所有排队的工作都将被接收和处理一次.

现在采用WaitGroup 用于使主要goroutine等待工人的变体(JimB的答案):如果你想要超过1个工人goroutine怎么办; 同时处理你的工作(很可能并行)?

您需要添加/更改代码的唯一方法是:要真正启动多个代码:

for i := 0; i < 10; i++ {
    wg.Add(1)
    go ProcessToDo(todo)
}
Run Code Online (Sandbox Code Playgroud)

在不更改任何其他内容的情况下,您现在拥有一个正确的并发应用程序,它使用10个并发goroutine接收和处理您的作业.我们没有使用任何"丑陋" time.Sleep()(我们使用的只是模拟慢速处理,而不是等待其他goroutines),而且你不需要任何额外的同步.


Jim*_*imB 5

让通道的使用者关闭它通常是一个坏主意,因为在关闭的通道上发送是一种恐慌。

在这种情况下,如果您不想在所有消息发送之前中断消费者,只需使用for...range循环并在完成后关闭通道。您还需要一个类似 a 的信号WaitGroup来等待 goroutine 完成(而不是使用 time.Sleep)

http://play.golang.org/p/r97vRPsxEb

var wg sync.WaitGroup

func ProcessToDo(todo chan string) {
    defer wg.Done()
    for work := range todo {
        fmt.Printf("todo: %q\n", work)
        time.Sleep(100 * time.Millisecond)

    }
    fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")

}

func main() {
    todo := make(chan string, 100)
    wg.Add(1)
    go ProcessToDo(todo)

    for i := 0; i < 20; i++ {
        todo <- fmt.Sprintf("Message %02d", i)
    }

    fmt.Println("*** all messages queued ***")
    close(todo)
    wg.Wait()
}
Run Code Online (Sandbox Code Playgroud)