如何正确使用sync.Cond?

Nat*_*man 23 synchronization go race-condition

我无法弄清楚如何正确使用sync.Cond.据我所知,在锁定Locker和调用条件的Wait方法之间存在竞争条件.此示例在主goroutine中的两条线之间添加了一个人工延迟来模拟竞争条件:

package main

import (
    "sync"
    "time"
)

func main() {
    m := sync.Mutex{}
    c := sync.NewCond(&m)
    go func() {
        time.Sleep(1 * time.Second)
        c.Broadcast()
    }()
    m.Lock()
    time.Sleep(2 * time.Second)
    c.Wait()
}
Run Code Online (Sandbox Code Playgroud)

[ 在游乐场跑步 ]

这会立即引起恐慌:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Syncsemacquire(0x10330208, 0x1)
    /usr/local/go/src/runtime/sema.go:241 +0x2e0
sync.(*Cond).Wait(0x10330200, 0x0)
    /usr/local/go/src/sync/cond.go:63 +0xe0
main.main()
    /tmp/sandbox301865429/main.go:17 +0x1a0

我究竟做错了什么?我该如何避免这种明显的竞争状况?我应该使用更好的同步构造吗?


编辑:我意识到我应该更好地解释我想在这里解决的问题.我有一个长期运行的goroutine,下载一个大文件和许多其他goroutine,当它们可用时需要访问HTTP标头.这个问题比听起来更难.

我不能使用频道,因为只有一个goroutine会收到该值.而其他一些goroutine将在它们已经可用之后很久就试图检索它们.

下载程序goroutine可以简单地将HTTP标头存储在变量中,并使用互斥锁来保护对它们的访问.但是,这并没有为其他goroutines提供"等待"它们可用的方法.

我曾经想过,一个sync.Mutex人和一个人sync.Cond都可以完成这个目标,但似乎这是不可能的.

gar*_*tor 18

OP回答了他自己,但没有直接回答原来的问题,我将发布如何正确使用sync.Cond.

sync.Cond如果每次写入和读取都有一个goroutine,那么你真的不需要- 只需一个sync.Mutex就可以在它们之间进行通信. sync.Cond在多个读者等待共享资源可用的情况下可能很有用.

var sharedRsc = make(map[string]interface{})
func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    m := sync.Mutex{}
    c := sync.NewCond(&m)
    go func() {
        // this go routine wait for changes to the sharedRsc
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc1"])
        c.L.Unlock()
        wg.Done()
    }()

    go func() {
        // this go routine wait for changes to the sharedRsc
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc2"])
        c.L.Unlock()
        wg.Done()
    }()

    // this one writes changes to sharedRsc
    c.L.Lock()
    sharedRsc["rsc1"] = "foo"
    sharedRsc["rsc2"] = "bar"
    c.Broadcast()
    c.L.Unlock()
    wg.Wait()
}
Run Code Online (Sandbox Code Playgroud)

操场

话虽如此,如果情况允许,仍然建议使用频道传递数据.

注意:sync.WaitGroup这里仅用于等待goroutines完成执行.

  • 不确定是否真的需要那些 for 循环。我知道文档提出了类似的建议,但似乎没有必要。一个 if 应该就足够了,因为它更改了拥有锁的 sharedRsc 并且如文档所述,Wait 仅在调用 Broadcast 或 Signal 后恢复 (2认同)
  • @cpr4t3s 在这个例子中,当然。一般来说,这是一个很好的习惯/实践,因为在 Broadcast() 场景中,第一个 goroutine 唤醒,更改状态,然后其他协程恢复(共享状态现在已更改)。我同意你的观点,并且在任何一个等待中的 goroutines 不可能影响依赖条件的情况下是绝对正确的。 (2认同)

eri*_*ang 8

您需要确保调用c.Wait 调用c.Broadcast.您的程序的正确版本将是:

package main

import (
    "fmt"
    "sync"
)

func main() {
    m := &sync.Mutex{}
    c := sync.NewCond(m)
    m.Lock()
    go func() {
        m.Lock() // Wait for c.Wait()
        c.Broadcast()
        m.Unlock()
    }()
    c.Wait() // Unlocks m, waits, then locks m again
    m.Unlock()
}
Run Code Online (Sandbox Code Playgroud)

https://play.golang.org/p/O1r8v8yW6h

  • C.Wait() 在返回之前再次锁定 m,因此我编辑了您的示例以在末尾添加 m.Unlock() 以确保正确性,否则您会惊讶地发现只有一名服务员醒来。 (3认同)

Nat*_*man 1

我终于找到了一种方法来做到这一点,而且它sync.Cond根本不涉及 - 只是互斥体。

type Task struct {
    m       sync.Mutex
    headers http.Header
}

func NewTask() *Task {
    t := &Task{}
    t.m.Lock()
    go func() {
        defer t.m.Unlock()
        // ...do stuff...
    }()
    return t
}

func (t *Task) WaitFor() http.Header {
    t.m.Lock()
    defer t.m.Unlock()
    return t.headers
}
Run Code Online (Sandbox Code Playgroud)

这是如何运作的?

互斥锁在任务开始时被锁定,确保任何调用WaitFor()都会被阻塞。一旦标头可用并且互斥体被 goroutine 解锁,每次调用WaitFor()都会一次执行一个。所有未来的调用(即使在 goroutine 结束之后)锁定互斥体都不会出现问题,因为它始终保持解锁状态。