let*_*4be 3 concurrency go channels
我需要读取从另一个 goroutine 设置的结构字段,即使知道肯定不会有并发访问(在读取发生之前写入完成,通过 发出信号chan struct{})也可能会导致陈旧数据
考虑到我可以保证没有并发访问,发送指向结构的指针(在第一个 goroutine 中创建,在第二个 goroutine 中修改,在第三个 goroutine 中读取)会解决可能的陈旧问题吗?
我想避免复制,因为结构很大并且包含填充在第二个 goroutine 中的巨大 Bytes.Buffer,我需要从第三个 goroutine 中读取
有一个锁定选项,但考虑到我知道不会有并发访问,这似乎有点矫枉过正
小智 5
对此有很多答案,这取决于您的数据结构和程序逻辑。
请参阅: 如何在并发 goroutines 期间锁定/同步对 Go 中变量的访问?
以及: 如何在 Golang 中使用 RWMutex?
1- 使用 有状态 Goroutines和通道
2- 使用sync.Mutex
3- 使用同步/原子
4- 使用 WaitGroup
5- 使用程序逻辑(信号量)
...
1:有状态的 Goroutines和通道:
我模拟了非常相似的示例(假设您想从一个 SSD 读取并以不同的速度写入另一个 SSD):
在此示例代码中,一个 goroutine(名为 write)做了一些准备数据并填充大数据的工作struct,另一个 goroutine(名为 read)从 big struct 读取数据,然后做一些工作,而管理 goroutine,保证不会并发访问相同的数据。三个 goroutine 之间的通信是通过通道完成的。在你的情况下,你可以使用通道数据的指针,或者像这个示例这样的全局结构。
输出将是这样的:
mean= 36.6920166015625 stdev= 6.068973186592054
我希望这可以帮助你理解这个想法。
工作示例代码:
package main
import (
"fmt"
"math"
"math/rand"
"runtime"
"sync"
"time"
)
type BigStruct struct {
big []uint16
rpos int
wpos int
full bool
empty bool
stopped bool
}
func main() {
wg.Add(1)
go write()
go read()
go manage()
runtime.Gosched()
stopCh <- <-time.After(5 * time.Second)
wg.Wait()
mean := Mean(hist)
stdev := stdDev(hist, mean)
fmt.Println("mean=", mean, "stdev=", stdev)
}
const N = 1024 * 1024 * 1024
var wg sync.WaitGroup
var stopCh chan time.Time = make(chan time.Time)
var hist []int = make([]int, 65536)
var s *BigStruct = &BigStruct{empty: true,
big: make([]uint16, N), //2GB
}
var rc chan uint16 = make(chan uint16)
var wc chan uint16 = make(chan uint16)
func next(pos int) int {
pos++
if pos >= N {
pos = 0
}
return pos
}
func manage() {
dataReady := false
var data uint16
for {
if !dataReady && !s.empty {
dataReady = true
data = s.big[s.rpos]
s.rpos++
if s.rpos >= N {
s.rpos = 0
}
s.empty = s.rpos == s.wpos
s.full = next(s.wpos) == s.rpos
}
if dataReady {
select {
case rc <- data:
dataReady = false
default:
runtime.Gosched()
}
}
if !s.full {
select {
case d := <-wc:
s.big[s.wpos] = d
s.wpos++
if s.wpos >= N {
s.wpos = 0
}
s.empty = s.rpos == s.wpos
s.full = next(s.wpos) == s.rpos
default:
runtime.Gosched()
}
}
if s.stopped {
if s.empty {
wg.Done()
return
}
}
}
}
func read() {
for {
d := <-rc
hist[d]++
}
}
func write() {
for {
wc <- uint16(rand.Intn(65536))
select {
case <-stopCh:
s.stopped = true
return
default:
runtime.Gosched()
}
}
}
func stdDev(data []int, mean float64) float64 {
sum := 0.0
for _, d := range data {
sum += math.Pow(float64(d)-mean, 2)
}
variance := sum / float64(len(data)-1)
return math.Sqrt(variance)
}
func Mean(data []int) float64 {
sum := 0.0
for _, d := range data {
sum += float64(d)
}
return sum / float64(len(data))
}
Run Code Online (Sandbox Code Playgroud)
5:对于某些用例的另一种方式(更快):
这里是另一种使用共享数据结构进行读取作业/写入作业/处理作业的方法,它在第一篇文章中被分开,现在在这里做同样的 3 个没有通道和互斥锁的作业。
工作样本:
package main
import (
"fmt"
"math"
"math/rand"
"time"
)
type BigStruct struct {
big []uint16
rpos int
wpos int
full bool
empty bool
stopped bool
}
func manage() {
for {
if !s.empty {
hist[s.big[s.rpos]]++ //sample read job with any time len
nextPtr(&s.rpos)
}
if !s.full && !s.stopped {
s.big[s.wpos] = uint16(rand.Intn(65536)) //sample wrire job with any time len
nextPtr(&s.wpos)
}
if s.stopped {
if s.empty {
return
}
} else {
s.stopped = time.Since(t0) >= 5*time.Second
}
}
}
func main() {
t0 = time.Now()
manage()
mean := Mean(hist)
stdev := StdDev(hist, mean)
fmt.Println("mean=", mean, "stdev=", stdev)
d0 := time.Since(t0)
fmt.Println(d0) //5.8523347s
}
var t0 time.Time
const N = 100 * 1024 * 1024
var hist []int = make([]int, 65536)
var s *BigStruct = &BigStruct{empty: true,
big: make([]uint16, N), //2GB
}
func next(pos int) int {
pos++
if pos >= N {
pos = 0
}
return pos
}
func nextPtr(pos *int) {
*pos++
if *pos >= N {
*pos = 0
}
s.empty = s.rpos == s.wpos
s.full = next(s.wpos) == s.rpos
}
func StdDev(data []int, mean float64) float64 {
sum := 0.0
for _, d := range data {
sum += math.Pow(float64(d)-mean, 2)
}
variance := sum / float64(len(data)-1)
return math.Sqrt(variance)
}
func Mean(data []int) float64 {
sum := 0.0
for _, d := range data {
sum += float64(d)
}
return sum / float64(len(data))
}
Run Code Online (Sandbox Code Playgroud)
为了在保留读取能力的同时防止对结构进行并发修改,您通常会嵌入一个sync.RWMutex。这不是豁免。您可以简单地在传输过程中锁定您的结构以进行写入,并在您方便的时间点将其解锁。
package main
import (
"fmt"
"sync"
"time"
)
// Big simulates your big struct
type Big struct {
sync.RWMutex
value string
}
// pump uses a groutine to take the slice of pointers to Big,
// locks the underlying structs and sends the pointers to
// the locked instances of Big downstream
func pump(bigs []*Big) chan *Big {
// We make the channel buffered for this example
// for illustration purposes
c := make(chan *Big, 3)
go func() {
for _, big := range bigs {
// We lock the struct before sending it to the channel
// so it can not be changed via pointer while in transit
big.Lock()
c <- big
}
close(c)
}()
return c
}
// sink reads pointers to the locked instances of Big
// reads them and unlocks them
func sink(c chan *Big) {
for big := range c {
fmt.Println(big.value)
time.Sleep(1 * time.Second)
big.Unlock()
}
}
// modify tries to achieve locks to the instances and modify them
func modify(bigs []*Big) {
for _, big := range bigs {
big.Lock()
big.value = "modified"
big.Unlock()
}
}
func main() {
bigs := []*Big{&Big{value: "Foo"}, &Big{value: "Bar"}, &Big{value: "Baz"}}
c := pump(bigs)
// For the sake of this example, we wait until all entries are
// send into the channel and hence are locked
time.Sleep(1 * time.Second)
// Now we try to modify concurrently before we even start to read
// the struct of which the pointers were sent into the channel
go modify(bigs)
sink(c)
// We use sleep here to keep waiting for modify() to finish simple.
// Usually, you'd use a sync.waitGroup
time.Sleep(1 * time.Second)
for _, big := range bigs {
fmt.Println(big.value)
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5257 次 |
| 最近记录: |