具有并发读取器的 Golang 缓冲区

Tym*_*nix 3 concurrency buffer go writer reader

我想在 Go 中构建一个支持多个并发读取器和一个写入器的缓冲区。写入缓冲区的任何内容都应该被所有读者读取。新读者可以随时加入,这意味着已经写入的数据必须能够为迟到的读者回放。

缓冲区应满足以下接口:

type MyBuffer interface {
    Write(p []byte) (n int, err error)
    NextReader() io.Reader
}
Run Code Online (Sandbox Code Playgroud)

您对这种最好使用内置类型的实现有什么建议吗?

icz*_*cza 5

根据作者的性质和您使用它的方式,将所有内容保留在内存中(以便能够为稍后加入的读者重新播放所有内容)是非常危险的,并且可能需要大量内存,或者导致您的应用程序崩溃内存不足。

将它用于将所有内容保存在内存中的“低流量”记录器可能没问题,但例如流式传输一些音频或视频很可能不是。

如果下面的读取器实现读取了写入缓冲区的所有数据,则它们的Read()方法将io.EOF正确报告。必须小心,因为某些构造(例如bufio.Scanner)一旦io.EOF遇到可能无法读取更多数据(但这不是我们实现的缺陷)。

如果您希望我们的缓冲区的读取器在缓冲区中没有更多数据可用时等待,等待直到写入新数据而不是返回io.EOF,您可以将返回的读取器包装在此处显示的“尾读取器”中:Go:“tail -f"-like 生成器

“内存安全”文件实现

这是一个非常简单和优雅的解决方案。它使用文件写入,也使用文件读取。同步基本上由操作系统提供。这不会有内存不足错误的风险,因为数据仅存储在磁盘上。根据作者的性质,这可能足够也可能不够。

我宁愿使用以下界面,因为Close()在文件的情况下很重要。

type MyBuf interface {
    io.WriteCloser
    NewReader() (io.ReadCloser, error)
}
Run Code Online (Sandbox Code Playgroud)

实现非常简单:

type mybuf struct {
    *os.File
}

func (mb *mybuf) NewReader() (io.ReadCloser, error) {
    f, err := os.Open(mb.Name())
    if err != nil {
        return nil, err
    }
    return f, nil
}

func NewMyBuf(name string) (MyBuf, error) {
    f, err := os.Create(name)
    if err != nil {
        return nil, err
    }
    return &mybuf{File: f}, nil
}
Run Code Online (Sandbox Code Playgroud)

我们的mybuf类型 embeds *os.File,所以我们得到了“免费”的Write()Close()方法。

NewReader()简单地打开现有的备份文件进行读取(在只读模式),并返回它,再考虑它实现的优势io.ReadCloser

创建一个新MyBuf值正在NewMyBuf()函数中实现,error如果创建文件失败,该函数也可能返回一个。

笔记:

请注意,由于mybufembeds *os.File,即使它们不是接口的一部分,也可以使用类型断言“到达”其他导出的方法。我不认为这是一个缺陷,但如果你想禁止这一点,你必须改变实现不嵌入而是将它作为命名字段(但你必须自己添加和方法,正确转发到场地)。os.FileMyBufmybufos.FileWrite()Close()os.File

内存实现

如果文件实现还不够,这里有一个内存实现。

由于我们现在只在内存中,我们将使用以下接口:

type MyBuf interface {
    io.Writer
    NewReader() io.Reader
}
Run Code Online (Sandbox Code Playgroud)

这个想法是存储曾经传递到我们缓冲区的所有字节切片。读取Read()器将在调用时提供存储的切片,每个读取器将跟踪其Read()方法提供了多少存储的切片。必须处理同步,我们将使用一个简单的sync.RWMutex.

闲话少说,下面是实现:

type mybuf struct {
    data [][]byte
    sync.RWMutex
}

func (mb *mybuf) Write(p []byte) (n int, err error) {
    if len(p) == 0 {
        return 0, nil
    }
    // Cannot retain p, so we must copy it:
    p2 := make([]byte, len(p))
    copy(p2, p)
    mb.Lock()
    mb.data = append(mb.data, p2)
    mb.Unlock()
    return len(p), nil
}

type mybufReader struct {
    mb   *mybuf // buffer we read from
    i    int    // next slice index
    data []byte // current data slice to serve
}

func (mbr *mybufReader) Read(p []byte) (n int, err error) {
    if len(p) == 0 {
        return 0, nil
    }
    // Do we have data to send?
    if len(mbr.data) == 0 {
        mb := mbr.mb
        mb.RLock()
        if mbr.i < len(mb.data) {
            mbr.data = mb.data[mbr.i]
            mbr.i++
        }
        mb.RUnlock()
    }
    if len(mbr.data) == 0 {
        return 0, io.EOF
    }

    n = copy(p, mbr.data)
    mbr.data = mbr.data[n:]
    return n, nil
}

func (mb *mybuf) NewReader() io.Reader {
    return &mybufReader{mb: mb}
}

func NewMyBuf() MyBuf {
    return &mybuf{}
}
Run Code Online (Sandbox Code Playgroud)

请注意,Writer.Write()包含的一般约定是实现不能保留传递的切片,因此我们必须在“存储”它之前制作它的副本。

另请注意,Read()阅读器尝试锁定最少量的时间。也就是说,它只在我们需要来自缓冲区的新数据切片时才锁定,并且只进行读锁定,这意味着如果读取器有部分数据切片,它将在Read()不锁定和接触缓冲区的情况下发送它。