尝试更新地图时的竞争条件

Shr*_*Deb 0 rpc go race-condition goroutine

介绍

我正在尝试在 /go 中构建客户端服务器应用程序。

服务器只有一个实例,其中包含任务列表,这些任务只是字符串数据。服务器维护任务映射及其分配状态。

另一方面,客户端可以有多个实例,并向服务器请求任务。当分配任务时,服务器将分配状态更新为 true。

客户端和服务器之间的通信是通过go rpc进行的。

请参考这个图表为了更好的理解。

为了模拟多个客户端,我在单个应用程序中启动多个 go 例程,向服务器发出请求。

代码

服务器

package main

import (
    "fmt"
    "log"
    "net/http"
    "net/rpc"
    "sync"
)

type Container struct {
    mu   sync.Mutex
    list map[string]bool
}

var ListOfTasks Container

func (c *Container) UpdateList() string {
    var t string
    ListOfTasks.mu.Lock()
    defer ListOfTasks.mu.Unlock()
    for k, v := range c.list {
        if !v {
            c.list[k] = true
            fmt.Println("Task Name", k)
            return k
        }
    }
    return t
}

// RPC Code
type RPCServer struct{}

type Input struct {}

type Output struct {
    Message string
}

func (s *RPCServer) GetTask(i *Input, o *Output) error {
    o.Message = ListOfTasks.UpdateList()
    return nil
}
//

func init() {
    ListOfTasks = Container{}
    ListOfTasks.list = map[string]bool{
        "task1" : false,
        "task2" : false,
        "task3" : false,
    }
}

func main() {
    some := new(RPCServer)
    rpc.Register(some)
    rpc.HandleHTTP()

    fmt.Println("Listening on port 1234")
    err := http.ListenAndServe(":1234", nil)
    if err != nil {
        log.Fatal("bruh", err.Error())
    }
}
Run Code Online (Sandbox Code Playgroud)

客户

package main

import (
    "fmt"
    "log"
    "net/rpc"
    "sync"
)

type Input struct {
    Name string
}

type Output struct {
    Message string  
}

func main() {
    client, err := rpc.DialHTTP("tcp", "localhost:1234")
    if err != nil {
        log.Fatal("bruh!", err.Error())
    }

    arg := &Input{} 
    reply := Output{}

    var wg sync.WaitGroup

    wg.Add(4)

    for i := 0; i< 4; i++ {
    go func(gori int) {
        defer wg.Done()
        err = client.Call("RPCServer.GetTask", arg, &reply)
        if err != nil {
            log.Fatal("bruh!", err.Error())
        }
        fmt.Printf("#%d: message: %s\n", gori, reply.Message)
    }(i)    
    }

    wg.Wait()
}
Run Code Online (Sandbox Code Playgroud)

问题

当多个客户端向服务器发出请求请求任务时,我们会遇到竞争条件,而 golang 的竞争检测器无法检测到这一情况。可以在应用程序的输出中观察到它。

服务器的输出

Listening on port 1234
Task Name task1
Task Name task2
Task Name task3
Run Code Online (Sandbox Code Playgroud)

客户端的输出

#1: message: task3
#2: message: task3
#0: message: task3
#3: message: task3
Run Code Online (Sandbox Code Playgroud)

服务器为每个工作人员分配不同的任务,但在工作人员一端,每个人都收到相同的任务。

我已经锁定了整个 for 循环,我们在其中搜索未分配的任务。

我尝试过的其他事情(但没有成功)

  1. 使用sync.Map:仍然出现竞争条件,这是由 golang 竞争检测器检测到的。

  2. 仅应用锁定c.list[k] = true:这导致了我现在面临的相同问题。


我怀疑我应用锁的方式,但我不明白我做错了什么。

我知道这个问题是由于有多个 go 例程尝试读取和更新地图而引起的。因此,我对发生读写的整个部分应用锁。

我读到的大多数文章或 StackOverflow 帖子都将服务器和客户端放在同一个应用程序中,并通过使用通道来避免此问题。我不确定如何在我的用例中使用通道。

另外,最终这将成为一个映射缩减应用程序,因此除了表示分配状态的标志之外,我还必须存储更多的数据。

感谢任何帮助,谢谢!

Pau*_*kin 5

服务器是正确的(尽管代码有点混乱——ListOfTasks当你有一个指向你的指针接收器时,为什么要在全局上使用互斥体Collection?)。

但是客户端编码是错误的:您&reply在客户端的所有 goroutine 中使用相同的指针,这意味着当一个客户端 goroutine 收到回复时,所有 goroutine 都会看到它,然后在接收回复和写入回复之间存在竞争。您需要将 移至reply := Output{}goroutine 中。

我相信竞争检测器检测到这一点,但您必须在客户端上运行它,而不是在服务器上。