X 个 goroutine 更新相同的变量

auj*_*jau 3 concurrency go race-condition

我想让 X 个 goroutineCountValue使用并行性进行更新(numRoutines 与你拥有的 CPU 数量一样多)。

解决方案一:

func count(numRoutines int) (countValue int) {
    var mu sync.Mutex
    k := func(i int) {
        mu.Lock()
        defer mu.Unlock()
        countValue += 5
    }
    for i := 0; i < numRoutines; i++ {
        go k(i)
    }
Run Code Online (Sandbox Code Playgroud)

它变成了数据竞争和返回的countValue = 0.

解决方案2:

func count(numRoutines int) (countValue int) {
    k := func(i int, c chan int) {
        c <- 5
    }
    c := make(chan int)
    for i := 0; i < numRoutines; i++ {
        go k(i, c)
    }

    for i := 0; i < numRoutines; i++ {
        countValue += <- c
    }
    return
}

Run Code Online (Sandbox Code Playgroud)

我对它做了一个基准测试,顺序添加比使用 goroutine 运行得更快。我认为这是因为我这里有两个 for 循环,当我放入countValue += <- c第一个 for 循环时,代码运行得更快。

解决方案3:

func count(numRoutines int) (countValue int) {
    var wg sync.WaitGroup

    c := make(chan int)

    k := func(i int) {
        defer wg.Done()
        c <- 5
    }
    for i := 0; i < numShards; i++ {
        wg.Add(1)
        go k(i)
    }

    go func() {
        for i := range c {
            countValue += i
        }
    }()

    wg.Wait()
    return
}

Run Code Online (Sandbox Code Playgroud)

仍然是比赛计数:/

有没有更好的方法来做到这一点?

Eli*_*gem 8

肯定有更好的方法来安全地增加变量:使用sync/atomic

import "sync/atomic"

var words int64
k := func() {
    _ = atomic.AddInt64(&words, 5) // increment atomically
}
Run Code Online (Sandbox Code Playgroud)

使用通道基本上消除了对互斥锁的需要,或者消除了并发访问变量本身的风险,而这里的等待组有点过大了

渠道:

words := 0
done := make(chan struct{}) // or use context
ch := make(chan int, numRoutines) // buffer so each routine can write
go func () {
    read := 0
    for i := range ch {
        words += 5 // or use i or something
        read++
        if read == numRoutines {
            break // we've received data from all routines
        }
    }
    close(done) // indicate this routine has terminated
}()
for i := 0; i < numRoutines; i++ {
    ch <- i // write whatever value needs to be used in the counting routine on the channel
}
<- done // wait for our routine that increments words to return
close(ch) // this channel is no longer needed
fmt.Printf("Counted %d\n", words)
Run Code Online (Sandbox Code Playgroud)

正如您所知,numRoutines不再是例程的数量,而是通道上写入的数量。您仍然可以将其转移到个人例程中:

for i := 0; i < numRoutines; i++ {
    go func(ch chan<- int, i int) {
        // do stuff here
        ch <- 5 * i // for example
    }(ch, i)
}
Run Code Online (Sandbox Code Playgroud)

等待组:

您可以使用 waitgroup +atomic 来获得相同的结果,而不是使用可以取消的上下文或通道。IMO 这样做的最简单方法是创建一个类型:

type counter struct {
    words int64
}

func (c *counter) doStuff(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    _ = atomic.AddInt64(&c.words, i * 5) // whatever value you need to add
}

func main () {
    cnt := counter{}
    wg := sync.WaitGroup{}
    wg.Add(numRoutines) // create the waitgroup
    for i := 0; i < numRoutines; i++ {
        go cnt.doStuff(&wg, i)
    }
    wg.Wait() // wait for all routines to finish
    fmt.Println("Counted %d\n", cnt.words)
}
Run Code Online (Sandbox Code Playgroud)

修复您的第三个解决方案

正如我在评论中提到的:您的第三个解决方案仍然会导致竞争条件,因为通道c从未关闭,这意味着例程:

go func () {
    for i := range c {
        countValue += i
    }
}()
Run Code Online (Sandbox Code Playgroud)

永远不会回来。waitgroup 还仅确保您已发送通道上的所有值,但不能确保已countValue增加到其最终值。解决方法是在wg.Wait()返回后关闭通道,以便例程可以返回,并添加一个done可以在最后一个例程返回时关闭的通道,并<-done在返回之前添加一条语句。

func count(numRoutines int) (countValue int) {
    var wg sync.WaitGroup

    c := make(chan int)

    k := func(i int) {
        defer wg.Done()
        c <- 5
    }
    for i := 0; i < numShards; i++ {
        wg.Add(1)
        go k(i)
    }

    done := make(chan struct{})
    go func() {
        for i := range c {
            countValue += i
        }
        close(done)
    }()

    wg.Wait()
    close(c)
    <-done
    return
}
Run Code Online (Sandbox Code Playgroud)

不过,这增加了一些混乱,而且在我看来有点混乱。wg.Wait()将调用移至例程可能会更容易:

func count(numRoutines int) (countValue int) {
    var wg sync.WaitGroup

    c := make(chan int)

    // add wg as argument, makes it easier to move this function outside of this scope
    k := func(wg *sync.WaitGroup, i int) {
        defer wg.Done()
        c <- 5
    }
    wg.Add(numShards) // increment the waitgroup once
    for i := 0; i < numShards; i++ {
        go k(&wg, i)
    }

    go func() {
        wg.Wait()
        close(c) // this ends the loop over the channel
    }()
    // just iterate over the channel until it is closed
    for i := range c {
       countValue += i
    }
    // we've added all values to countValue
    return
}
Run Code Online (Sandbox Code Playgroud)

  • @aujau另外:从表面上看,你的问题可能看起来像一个基本问题,但这是一个真正的好问题的罕见例子。我们最近没见过很多东西。展示您的努力,包括显示您尝试过的内容的片段、对问题的清晰描述,并接受您可能错过了解决问题的另一种方法的想法。也许应该添加一个功能,为这样的问题提供赏金式的代表 (2认同)