高效的FIFO队列,用于Python中任意大小的字节块

Rog*_*ahl 17 python

我如何实现一个FIFO缓冲区,我可以有效地向头部添加任意大小的字节块,从中可以有效地从尾部弹出任意大小的字节块?

背景:

我有一个类,它以类似文件的对象从任意大小的块中读取字节,并且本身就是一个类文件对象,客户端可以从中读取任意大小的块中的字节.

我实现这一点的方法是,每当客户端想要读取一大块字节时,该类将重复读取基础文件类对象(具有适合这些对象的块大小)并将字节添加到FIFO的头部队列,直到队列中有足够的字节为客户端提供所请求大小的块.然后它将这些字节从队列尾部弹出并将它们返回给客户端.

当基础文件类对象的块大小远大于客户端从类中读取时使用的块大小时,会出现性能问题.

假设基础文件类对象的块大小为1 MiB,客户端读取的块大小为1 KiB.客户端第一次请求1 KiB时,该类必须读取1 MiB并将其添加到FIFO队列中.然后,对于该请求和随后的1023个请求,类必须从FIFO队列的尾部弹出1 KiB,其大小从1 MiB逐渐减小到0字节,此时循环再次开始.

我目前用StringIO对象实现了它.将新字节写入StringIO对象的末尾很快,但是从头开始删除字节非常慢,因为必须创建一个新的StringIO对象,它保存整个前一个缓冲区的副本减去第一个字节块.

处理类似问题的问题往往指向deque容器.但是,deque实现为双向链表.将一个块写入双端队列需要将块拆分为对象,每个对象包含一个字节.然后,deque将向每个对象添加两个指针用于存储,与字节相比,可能将存储器需求增加至少一个数量级.此外,遍历链表并处理每个对象都需要很长时间才能将块拆分为对象并将对象连接成块.

var*_*tec 14

我目前用StringIO对象实现了它.将新字节写入StringIO对象的末尾很快,但是从头开始删除字节非常慢,因为必须创建一个新的StringIO对象,它保存整个前一个缓冲区的副本减去第一个字节块.

实际上,实现FIFO的最典型方法是两个使用环绕缓冲区,其中有两个指针:

在此输入图像描述 图像源

现在,您可以StringIO()使用.seek()从适当位置读取/写入来实现它.


Cam*_*ron 12

更新:这是来自vartec答案的循环缓冲技术的实现(基于我原来的答案,保存在下面,对于那些好奇的人):

from cStringIO import StringIO

class FifoFileBuffer(object):
    def __init__(self):
        self.buf = StringIO()
        self.available = 0    # Bytes available for reading
        self.size = 0
        self.write_fp = 0

    def read(self, size = None):
        """Reads size bytes from buffer"""
        if size is None or size > self.available:
            size = self.available
        size = max(size, 0)

        result = self.buf.read(size)
        self.available -= size

        if len(result) < size:
            self.buf.seek(0)
            result += self.buf.read(size - len(result))

        return result


    def write(self, data):
        """Appends data to buffer"""
        if self.size < self.available + len(data):
            # Expand buffer
            new_buf = StringIO()
            new_buf.write(self.read())
            self.write_fp = self.available = new_buf.tell()
            read_fp = 0
            while self.size <= self.available + len(data):
                self.size = max(self.size, 1024) * 2
            new_buf.write('0' * (self.size - self.write_fp))
            self.buf = new_buf
        else:
            read_fp = self.buf.tell()

        self.buf.seek(self.write_fp)
        written = self.size - self.write_fp
        self.buf.write(data[:written])
        self.write_fp += len(data)
        self.available += len(data)
        if written < len(data):
            self.write_fp -= self.size
            self.buf.seek(0)
            self.buf.write(data[written:])
        self.buf.seek(read_fp)
Run Code Online (Sandbox Code Playgroud)

原始答案(由以上一个取代):

您可以使用缓冲区并跟踪起始索引(读取文件指针),当它变得太大时偶尔压缩它(这应该产生相当好的摊销性能).

例如,像这样包装一个StringIO对象:

from cStringIO import StringIO
class FifoBuffer(object):
    def __init__(self):
        self.buf = StringIO()

    def read(self, *args, **kwargs):
        """Reads data from buffer"""
        self.buf.read(*args, **kwargs)

    def write(self, *args, **kwargs):
        """Appends data to buffer"""
        current_read_fp = self.buf.tell()
        if current_read_fp > 10 * 1024 * 1024:
            # Buffer is holding 10MB of used data, time to compact
            new_buf = StringIO()
            new_buf.write(self.buf.read())
            self.buf = new_buf
            current_read_fp = 0

        self.buf.seek(0, 2)    # Seek to end
        self.buf.write(*args, **kwargs)

        self.buf.seek(current_read_fp)
Run Code Online (Sandbox Code Playgroud)

  • +1这太棒了.感谢您的完整实施. (3认同)

Edd*_*iao 8

...但从头开始删除字节非常慢,因为必须创建一个新的 StringIO 对象,该对象包含整个先前缓冲区减去第一个字节块的副本。

可以通过bytearray在 Python>=v3.4 中使用来克服这种缓慢。请参阅此问题中的讨论,补丁在此处

关键是:从bytearrayby 中删除头字节

a[:1] = b''   # O(1) (amortized)
Run Code Online (Sandbox Code Playgroud)

a = a[1:]     # O(len(a))
Run Code Online (Sandbox Code Playgroud)

什么时候len(a)很大(比如 10**6)。

bytearray还提供了一种方便的方式来预览整个数据集作为一个数组(即本身),而相比之下,双端队列容器,其需要加入的对象成块。

现在可以按如下方式实现高效的 FIFO

class byteFIFO:
    """ byte FIFO buffer """
    def __init__(self):
        self._buf = bytearray()

    def put(self, data):
        self._buf.extend(data)

    def get(self, size):
        data = self._buf[:size]
        # The fast delete syntax
        self._buf[:size] = b''
        return data

    def peek(self, size):
        return self._buf[:size]

    def getvalue(self):
        # peek with no copy
        return self._buf

    def __len__(self):
        return len(self._buf)
Run Code Online (Sandbox Code Playgroud)

基准

import time
bfifo = byteFIFO()
bfifo.put(b'a'*1000000)        # a very long array
t0 = time.time()
for k in range(1000000):
    d = bfifo.get(4)           # "pop" from head
    bfifo.put(d)               # "push" in tail
print('t = ', time.time()-t0)  # t = 0.897 on my machine
Run Code Online (Sandbox Code Playgroud)

Cameron 回答中的循环/环形缓冲区实现需要 2.378 秒,而他/她的原始实现需要 1.108 秒。

  • 也可以使用“del”,如下所示:“del self._buf[:size]”来使用快速删除语法 (2认同)