这是@Jimt编写的Go中的工作者和控制器模式的一个很好的例子,回答" 是否有一些优雅的方式来暂停和恢复golang中的任何其他goroutine? "
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// Possible worker states.
const (
Stopped = 0
Paused = 1
Running = 2
)
// Maximum number of workers.
const WorkerCount = 1000
func main() {
// Launch workers.
var wg sync.WaitGroup
wg.Add(WorkerCount + 1)
workers := make([]chan int, WorkerCount)
for i := range workers {
workers[i] = make(chan int)
go func(i int) {
worker(i, workers[i])
wg.Done()
}(i)
}
// Launch controller routine.
go func() {
controller(workers)
wg.Done()
}()
// Wait for all goroutines to finish.
wg.Wait()
}
func worker(id int, ws <-chan int) {
state := Paused // Begin in the paused state.
for {
select {
case state = <-ws:
switch state {
case Stopped:
fmt.Printf("Worker %d: Stopped\n", id)
return
case Running:
fmt.Printf("Worker %d: Running\n", id)
case Paused:
fmt.Printf("Worker %d: Paused\n", id)
}
default:
// We use runtime.Gosched() to prevent a deadlock in this case.
// It will not be needed of work is performed here which yields
// to the scheduler.
runtime.Gosched()
if state == Paused {
break
}
// Do actual work here.
}
}
}
// controller handles the current state of all workers. They can be
// instructed to be either running, paused or stopped entirely.
func controller(workers []chan int) {
// Start workers
for i := range workers {
workers[i] <- Running
}
// Pause workers.
<-time.After(1e9)
for i := range workers {
workers[i] <- Paused
}
// Unpause workers.
<-time.After(1e9)
for i := range workers {
workers[i] <- Running
}
// Shutdown workers.
<-time.After(1e9)
for i := range workers {
close(workers[i])
}
}
Run Code Online (Sandbox Code Playgroud)
但是这段代码也存在一个问题:如果要在退出workers时删除工作通道,则会worker()发生死锁.
如果你close(workers[i]),下次控制器写入它会引起恐慌,因为go无法写入封闭的通道.如果你使用一些互斥锁来保护它,那么它会被卡住,workers[i] <- Running因为worker它没有从通道读取任何东西而且写入将被阻止,而互斥锁将导致死锁.您还可以为通道提供更大的缓冲区作为解决方案,但这还不够好.
因此我认为解决这个问题的最佳方法是worker()退出时关闭通道,如果控制器发现通道关闭,它将跳过它并且什么都不做.但在这种情况下,我无法找到如何检查通道是否已经关闭.如果我尝试读取控制器中的通道,则可能会阻止控制器.所以我现在非常困惑.
PS:我已经尝试过恢复引起恐慌的恐慌,但它会关闭引起恐慌的goroutine.在这种情况下,它将是控制器,所以它是没用的.
不过,我认为Go团队在下一版Go中实现这个功能很有用.
Dus*_*tin 67
没有办法编写一个安全的应用程序,你需要知道一个频道是否打开而不与它交互.
做你想要做的事情的最好方法是有两个渠道 - 一个用于工作,一个用于表示改变状态的愿望(以及完成状态改变,如果这很重要).
频道很便宜.复杂的设计重载语义不是.
[也]
<-time.After(1e9)
Run Code Online (Sandbox Code Playgroud)
写作是一种非常令人困惑和非显而易见的方式
time.Sleep(time.Second)
Run Code Online (Sandbox Code Playgroud)
保持简单,每个人(包括你)都能理解它们.
zzz*_*zzz 49
以一种hacky的方式,可以通过恢复引发的恐慌来尝试写入的频道.但是如果没有读取它就无法检查读取通道是否已关闭.
要么你会
v <- c)v, ok <- c)v, ok <- c)v <- c)从技术上讲,只有最后一个没有从频道读取,但这没什么用处.
我知道这个答案太晚了,我已经写了这个解决方案,Hacking Go运行时,这不是安全的,它可能会崩溃:
import (
"unsafe"
"reflect"
)
func isChanClosed(ch interface{}) bool {
if reflect.TypeOf(ch).Kind() != reflect.Chan {
panic("only channels!")
}
// get interface value pointer, from cgo_export
// typedef struct { void *t; void *v; } GoInterface;
// then get channel real pointer
cptr := *(*uintptr)(unsafe.Pointer(
unsafe.Pointer(uintptr(unsafe.Pointer(&ch)) + unsafe.Sizeof(uint(0))),
))
// this function will return true if chan.closed > 0
// see hchan on https://github.com/golang/go/blob/master/src/runtime/chan.go
// type hchan struct {
// qcount uint // total data in the queue
// dataqsiz uint // size of the circular queue
// buf unsafe.Pointer // points to an array of dataqsiz elements
// elemsize uint16
// closed uint32
// **
cptr += unsafe.Sizeof(uint(0))*2
cptr += unsafe.Sizeof(unsafe.Pointer(uintptr(0)))
cptr += unsafe.Sizeof(uint16(0))
return *(*uint32)(unsafe.Pointer(cptr)) > 0
}
Run Code Online (Sandbox Code Playgroud)
https://gist.github.com/youssifsayed/ca0cfcf9dc87905d37a4fee7beb253c2
| 归档时间: |
|
| 查看次数: |
89840 次 |
| 最近记录: |