带通道的读写排除

lbo*_*onn 5 concurrency go

我想在Go中编写一个小的内存数据库.读取和写入请求将通过通道传递并由db引擎处理,这将确保访问正确完成.

第一个想法是模仿RWMutex的行为.只有它会使用更惯用的风格.

这是我想要做的一个小玩具(虽然,相当长)的例子.

package main

import (
    "log"
    "math/rand"
    "time"
)

var source *rand.Rand

type ReqType int

const (
    READ = iota
    WRITE
)

type DbRequest struct {
    Type  int              // request type
    RespC chan *DbResponse // channel for request response
    // content here
}

type DbResponse struct {
    // response here
}

type Db struct {
    // DB here
}

func randomWait() {
    time.Sleep(time.Duration(source.Intn(1000)) * time.Millisecond)
}

func (d *Db) readsHandler(in <-chan *DbRequest) {
    for r := range in {
        id := source.Intn(4000000)
        log.Println("read ", id, " starts")
        randomWait()
        log.Println("read ", id, " ends")
        r.RespC <- &DbResponse{}
    }
}

func (d *Db) writesHandler(r *DbRequest) *DbResponse {
    id := source.Intn(4000000)
    log.Println("write ", id, " starts")
    randomWait()
    log.Println("write ", id, " ends")
    return &DbResponse{}
}

func (d *Db) Start(nReaders int) chan *DbRequest {
    in := make(chan *DbRequest, 100)
    reads := make(chan *DbRequest, nReaders)

    // launch readers
    for k := 0; k < nReaders; k++ {
        go d.readsHandler(reads)
    }

    go func() {
        for r := range in {
            switch r.Type {
            case READ:
                reads <- r
            case WRITE:
                // here we should wait for all reads to
                // be over (how ??)

                r.RespC <- d.writesHandler(r)

                // here writesHandler is blocking,
                // this ensures that no additional
                // read is added in the reads channel
                // before the write is finished
            }
        }
    }()

    return in
}

func main() {
    seed := time.Now().Unix()
    source = rand.New(rand.NewSource(seed))

    blackhole := make(chan *DbResponse, 100)

    d := Db{}
    rc := d.Start(4)
    wc := time.After(3 * time.Second)

    go func() {
        for {
            <-blackhole
        }
    }()

    for {
        select {
        case <-wc:
            return
        default:
            if source.Intn(2) == 0 {
                rc <- &DbRequest{READ, blackhole}
            } else {
                rc <- &DbRequest{WRITE, blackhole}
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

当然,这个例子显示了读/写冲突.

我觉得我正在尝试做一些有点邪恶的事情:使用旨在避免它的构造共享内存......在这一点上,一个明显的解决方案是在两种类型的请求处理周围添加RWMutex锁,但也许有一个只使用goroutines和渠道的聪明解决方案.

Son*_*nia 6

为什么不直接使用RWMutex?它经过优化,非常高效,而且概念上很简单.只需在Db对象中嵌入一个

type Db struct {
    sync.RWMutex
    // DB here
}
Run Code Online (Sandbox Code Playgroud)

你可以称之为

db := &Db{}
...
db.Lock()
// do RW operations
db.Unlock()
...
db.RLock()
// do Read operations
db.RUnlock()
Run Code Online (Sandbox Code Playgroud)

我不知道如何使用频道获得更好的性能.您可以但是获得与无锁技术更好的性能,但我建议让您的RWMutex版本首先运行.

另一个并发问题是fmt包写入stdout不是线程安全的,你最终会看到乱码输出.请尝试使用日志包.您可以将其设置为写入没有日志记录前缀的stdout,它将确保原子写入.

  • 最好的参考是Dmitry Vyukov的1024核心(http://www.1024cores.net/).我是初学者,几乎不知道我在做什么,但我确实为Rosetta Code做了一个例子.请参阅http://rosettacode.org/wiki/Atomic_updates.在两个基于通道的版本之间,一个版本比另一个版本快10倍.RWMutex版本再次运行速度提高约50%,然后无锁版本再次运行速度提高两倍.比最天真的基于频道的版本提高了300倍! (2认同)