没有阅读,如何检查频道是否关闭?

Rec*_*Hou 64 channel go

这是@Jimt编写的Go中的工作者和控制器模式的一个很好的例子,回答" 是否有一些优雅的方式来暂停和恢复golang中的任何其他goroutine? "

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

// Possible worker states.
const (
    Stopped = 0
    Paused  = 1
    Running = 2
)

// Maximum number of workers.
const WorkerCount = 1000

func main() {
    // Launch workers.
    var wg sync.WaitGroup
    wg.Add(WorkerCount + 1)

    workers := make([]chan int, WorkerCount)
    for i := range workers {
        workers[i] = make(chan int)

        go func(i int) {
            worker(i, workers[i])
            wg.Done()
        }(i)
    }

    // Launch controller routine.
    go func() {
        controller(workers)
        wg.Done()
    }()

    // Wait for all goroutines to finish.
    wg.Wait()
}

func worker(id int, ws <-chan int) {
    state := Paused // Begin in the paused state.

    for {
        select {
        case state = <-ws:
            switch state {
            case Stopped:
                fmt.Printf("Worker %d: Stopped\n", id)
                return
            case Running:
                fmt.Printf("Worker %d: Running\n", id)
            case Paused:
                fmt.Printf("Worker %d: Paused\n", id)
            }

        default:
            // We use runtime.Gosched() to prevent a deadlock in this case.
            // It will not be needed of work is performed here which yields
            // to the scheduler.
            runtime.Gosched()

            if state == Paused {
                break
            }

            // Do actual work here.
        }
    }
}

// controller handles the current state of all workers. They can be
// instructed to be either running, paused or stopped entirely.
func controller(workers []chan int) {
    // Start workers
    for i := range workers {
        workers[i] <- Running
    }

    // Pause workers.
    <-time.After(1e9)
    for i := range workers {
        workers[i] <- Paused
    }

    // Unpause workers.
    <-time.After(1e9)
    for i := range workers {
        workers[i] <- Running
    }

    // Shutdown workers.
    <-time.After(1e9)
    for i := range workers {
        close(workers[i])
    }
}
Run Code Online (Sandbox Code Playgroud)

但是这段代码也存在一个问题:如果要在退出workers时删除工作通道,则会worker()发生死锁.

如果你close(workers[i]),下次控制器写入它会引起恐慌,因为go无法写入封闭的通道.如果你使用一些互斥锁来保护它,那么它会被卡住,workers[i] <- Running因为worker它没有从通道读取任何东西而且写入将被阻止,而互斥锁将导致死锁.您还可以为通道提供更大的缓冲区作为解决方案,但这还不够好.

因此我认为解决这个问题的最佳方法是worker()退出时关闭通道,如果控制器发现通道关闭,它将跳过它并且什么都不做.但在这种情况下,我无法找到如何检查通道是否已经关闭.如果我尝试读取控制器中的通道,则可能会阻止控制器.所以我现在非常困惑.

PS:我已经尝试过恢复引起恐慌的恐慌,但它会关闭引起恐慌的goroutine.在这种情况下,它将是控制器,所以它是没用的.

不过,我认为Go团队在下一版Go中实现这个功能很有用.

Dus*_*tin 67

没有办法编写一个安全的应用程序,你需要知道一个频道是否打开而不与它交互.

做你想要做的事情的最好方法是有两个渠道 - 一个用于工作,一个用于表示改变状态的愿望(以及完成状态改变,如果这很重要).

频道很便宜.复杂的设计重载语义不是.

[也]

<-time.After(1e9)
Run Code Online (Sandbox Code Playgroud)

写作是一种非常令人困惑和非显而易见的方式

time.Sleep(time.Second)
Run Code Online (Sandbox Code Playgroud)

保持简单,每个人(包括你)都能理解它们.

  • 该行+10:_Channels很便宜.复杂的设计重载语义不是._ (11认同)
  • 嗯,我是 GO 新手,但我认为 time.Sleep 会阻塞主线程,而 &lt;-time.After(...) 如果没有其他通道正在生成,则会阻塞,但不会阻塞任何其他正在查询的通道在选择里面。但我完全同意明确的说法。不过我明白你说的。 (4认同)

zzz*_*zzz 49

以一种hacky的方式,可以通过恢复引发的恐慌来尝试写入的频道.但是如果没有读取它就无法检查读取通道是否已关闭.

要么你会

  • 最终从它读取"真实"值(v <- c)
  • 读取"true"值和"not closed"指示符(v, ok <- c)
  • 读取零值和'关闭'指标(v, ok <- c)
  • 会在频道中阻止永远读取(v <- c)

从技术上讲,只有最后一个没有从频道读取,但这没什么用处.

  • 恢复引发的恐慌是我尝试过的,但它会关闭引发恐慌的 goroutine。在这种情况下,它将是“控制器”,所以它没有用:) (2认同)

you*_*sif 7

我知道这个答案太晚了,我已经写了这个解决方案,Hacking Go运行时,这不是安全的,它可能会崩溃:

import (
    "unsafe"
    "reflect"
)


func isChanClosed(ch interface{}) bool {
    if reflect.TypeOf(ch).Kind() != reflect.Chan {
        panic("only channels!")
    }

    // get interface value pointer, from cgo_export 
    // typedef struct { void *t; void *v; } GoInterface;
    // then get channel real pointer
    cptr := *(*uintptr)(unsafe.Pointer(
        unsafe.Pointer(uintptr(unsafe.Pointer(&ch)) + unsafe.Sizeof(uint(0))),
    ))

    // this function will return true if chan.closed > 0
    // see hchan on https://github.com/golang/go/blob/master/src/runtime/chan.go 
    // type hchan struct {
    // qcount   uint           // total data in the queue
    // dataqsiz uint           // size of the circular queue
    // buf      unsafe.Pointer // points to an array of dataqsiz elements
    // elemsize uint16
    // closed   uint32
    // **

    cptr += unsafe.Sizeof(uint(0))*2
    cptr += unsafe.Sizeof(unsafe.Pointer(uintptr(0)))
    cptr += unsafe.Sizeof(uint16(0))
    return *(*uint32)(unsafe.Pointer(cptr)) > 0
}
Run Code Online (Sandbox Code Playgroud)

https://gist.github.com/youssifsayed/ca0cfcf9dc87905d37a4fee7beb253c2

  • 这可能非常聪明,但感觉就像在另一个问题上添加一个问题 (3认同)
  • `go vet` 在最后一行 `return *(*uint32)(unsafe.Pointer(cptr)) &gt; 0` 上返回“可能滥用 unsafe.Pointer” 和 `cptr += unsafe.Sizeof(unsafe.Pointer(uintptr(0)) )))` 是否有一个选项可以在这些行中不使用 unsafe.Pointer 来执行此操作? (2认同)
  • 您需要在一个表达式中完成所有指针算术才能让兽医满意。这个解决方案是一个数据竞争并且不是有效的Go,您还必须至少使用atomic.LoadUint32 进行关闭的读取。不管怎样,这都是一个非常脆弱的 hack,如果 hchan 在 Go 版本之间发生变化,这就会崩溃。 (2认同)