auj*_*jau 3 concurrency go race-condition
我想让 X 个 goroutineCountValue使用并行性进行更新(numRoutines 与你拥有的 CPU 数量一样多)。
解决方案一:
func count(numRoutines int) (countValue int) {
var mu sync.Mutex
k := func(i int) {
mu.Lock()
defer mu.Unlock()
countValue += 5
}
for i := 0; i < numRoutines; i++ {
go k(i)
}
Run Code Online (Sandbox Code Playgroud)
它变成了数据竞争和返回的countValue = 0.
解决方案2:
func count(numRoutines int) (countValue int) {
k := func(i int, c chan int) {
c <- 5
}
c := make(chan int)
for i := 0; i < numRoutines; i++ {
go k(i, c)
}
for i := 0; i < numRoutines; i++ {
countValue += <- c
}
return
}
Run Code Online (Sandbox Code Playgroud)
我对它做了一个基准测试,顺序添加比使用 goroutine 运行得更快。我认为这是因为我这里有两个 for 循环,当我放入countValue += <- c第一个 for 循环时,代码运行得更快。
解决方案3:
func count(numRoutines int) (countValue int) {
var wg sync.WaitGroup
c := make(chan int)
k := func(i int) {
defer wg.Done()
c <- 5
}
for i := 0; i < numShards; i++ {
wg.Add(1)
go k(i)
}
go func() {
for i := range c {
countValue += i
}
}()
wg.Wait()
return
}
Run Code Online (Sandbox Code Playgroud)
仍然是比赛计数:/
有没有更好的方法来做到这一点?
肯定有更好的方法来安全地增加变量:使用sync/atomic:
import "sync/atomic"
var words int64
k := func() {
_ = atomic.AddInt64(&words, 5) // increment atomically
}
Run Code Online (Sandbox Code Playgroud)
使用通道基本上消除了对互斥锁的需要,或者消除了并发访问变量本身的风险,而这里的等待组有点过大了
words := 0
done := make(chan struct{}) // or use context
ch := make(chan int, numRoutines) // buffer so each routine can write
go func () {
read := 0
for i := range ch {
words += 5 // or use i or something
read++
if read == numRoutines {
break // we've received data from all routines
}
}
close(done) // indicate this routine has terminated
}()
for i := 0; i < numRoutines; i++ {
ch <- i // write whatever value needs to be used in the counting routine on the channel
}
<- done // wait for our routine that increments words to return
close(ch) // this channel is no longer needed
fmt.Printf("Counted %d\n", words)
Run Code Online (Sandbox Code Playgroud)
正如您所知,numRoutines不再是例程的数量,而是通道上写入的数量。您仍然可以将其转移到个人例程中:
for i := 0; i < numRoutines; i++ {
go func(ch chan<- int, i int) {
// do stuff here
ch <- 5 * i // for example
}(ch, i)
}
Run Code Online (Sandbox Code Playgroud)
您可以使用 waitgroup +atomic 来获得相同的结果,而不是使用可以取消的上下文或通道。IMO 这样做的最简单方法是创建一个类型:
type counter struct {
words int64
}
func (c *counter) doStuff(wg *sync.WaitGroup, i int) {
defer wg.Done()
_ = atomic.AddInt64(&c.words, i * 5) // whatever value you need to add
}
func main () {
cnt := counter{}
wg := sync.WaitGroup{}
wg.Add(numRoutines) // create the waitgroup
for i := 0; i < numRoutines; i++ {
go cnt.doStuff(&wg, i)
}
wg.Wait() // wait for all routines to finish
fmt.Println("Counted %d\n", cnt.words)
}
Run Code Online (Sandbox Code Playgroud)
正如我在评论中提到的:您的第三个解决方案仍然会导致竞争条件,因为通道c从未关闭,这意味着例程:
go func () {
for i := range c {
countValue += i
}
}()
Run Code Online (Sandbox Code Playgroud)
永远不会回来。waitgroup 还仅确保您已发送通道上的所有值,但不能确保已countValue增加到其最终值。解决方法是在wg.Wait()返回后关闭通道,以便例程可以返回,并添加一个done可以在最后一个例程返回时关闭的通道,并<-done在返回之前添加一条语句。
func count(numRoutines int) (countValue int) {
var wg sync.WaitGroup
c := make(chan int)
k := func(i int) {
defer wg.Done()
c <- 5
}
for i := 0; i < numShards; i++ {
wg.Add(1)
go k(i)
}
done := make(chan struct{})
go func() {
for i := range c {
countValue += i
}
close(done)
}()
wg.Wait()
close(c)
<-done
return
}
Run Code Online (Sandbox Code Playgroud)
不过,这增加了一些混乱,而且在我看来有点混乱。wg.Wait()将调用移至例程可能会更容易:
func count(numRoutines int) (countValue int) {
var wg sync.WaitGroup
c := make(chan int)
// add wg as argument, makes it easier to move this function outside of this scope
k := func(wg *sync.WaitGroup, i int) {
defer wg.Done()
c <- 5
}
wg.Add(numShards) // increment the waitgroup once
for i := 0; i < numShards; i++ {
go k(&wg, i)
}
go func() {
wg.Wait()
close(c) // this ends the loop over the channel
}()
// just iterate over the channel until it is closed
for i := range c {
countValue += i
}
// we've added all values to countValue
return
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1913 次 |
| 最近记录: |