有效的循环缓冲?

jed*_*ikb 96 python circular-buffer

我想在python中创建一个有效的循环缓冲区(目标是取缓冲区中的整数值的平均值).

这是使用列表收集值的有效方法吗?

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )
Run Code Online (Sandbox Code Playgroud)

什么会更有效(以及为什么)?

aar*_*ing 187

我会用collections.deque一个maxlenARG

>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
...     d.append(i)
... 
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)
Run Code Online (Sandbox Code Playgroud)

文档中有一个与您想要的类似的配方deque.我断言它是最有效的,完全取决于它是由一个非常熟练的工作人员在C中实现的,这种工作人员习惯于开出顶级代码.

  • 我不喜欢这个解决方案,因为当定义`maxlen`时,文档不保证O(1)随机访问.当deque可以增长到无穷大时,O(n)是可以理解的,但是如果给出`maxlen`,则索引元素应该是恒定的时间. (7认同)
  • +1是的,包括漂亮的电池.循环缓冲区的操作是O(1),正如你所说的额外开销是C,所以应该还是非常快 (6认同)
  • 实际的Python实现(参见此处:https://github.com/python/cpython/blob/main/Modules/_collectionsmodule.c)使用了两种想法的混合:固定大小块的链接列表。即,它在内部将双端队列分成相同大小的内存块。这将减轻纯链表实现的一些成本,但长双端队列的内部项的访问时间仍然是“O(maxlen)”(又名“O(n)”) (4认同)

Joh*_*ooy 12

从列表的头部弹出会导致整个列表被复制,因此效率低下

您应该使用固定大小的列表/数组和在添加/删除项目时在缓冲区中移动的索引

  • 同意.无论它看起来多么优雅或不优雅,或者使用何种语言.实际上,您对垃圾收集器(或堆管理器或分页/映射机制或实际内存魔法)的麻烦越少越好. (3认同)
  • 我同意.这样做也比有些人想象的要容易得多.只需使用不断增加的计数器,并在访问项目时使用模运算符(%arraylen). (3认同)

Orv*_*var 9

Python的deque很慢.您也可以使用numpy.roll而不是 如何在numpy数组形状(n,)或(n,1)中旋转数字?

在这个基准测试中,deque是448ms.Numpy.roll是29ms http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/

  • 这个答案非常具有误导性 - Python 的双端队列似乎非常快,但是在您链接到的基准测试中,从 numpy 数组到 numpy 数组的转换会大大减慢它的速度。 (11认同)
  • 但是“numpy.roll”返回数组的副本,对吧? (2认同)

Bas*_*asj 9

根据MoonCactus的答案,这是一个circularlist类.与他的版本的不同之处在于,这里 c[0]将始终给出最旧的附加元素,c[-1]最新附加的元素,c[-2]倒数第二个...这对于应用程序来说更自然.

c = circularlist(4)
c.append(1); print c, c[0], c[-1]    #[1]              1, 1
c.append(2); print c, c[0], c[-1]    #[1, 2]           1, 2
c.append(3); print c, c[0], c[-1]    #[1, 2, 3]        1, 3
c.append(8); print c, c[0], c[-1]    #[1, 2, 3, 8]     1, 8
c.append(10); print c, c[0], c[-1]   #[10, 2, 3, 8]    2, 10
c.append(11); print c, c[0], c[-1]   #[10, 11, 3, 8]   3, 11
Run Code Online (Sandbox Code Playgroud)

类:

class circularlist(object):
    def __init__(self, size, data = []):
        """Initialization"""
        self.index = 0
        self.size = size
        self._data = list(data)[-size:]

    def append(self, value):
        """Append an element"""
        if len(self._data) == self.size:
            self._data[self.index] = value
        else:
            self._data.append(value)
        self.index = (self.index + 1) % self.size

    def __getitem__(self, key):
        """Get element by index, relative to the current index"""
        if len(self._data) == self.size:
            return(self._data[(key + self.index) % self.size])
        else:
            return(self._data[key])

    def __repr__(self):
        """Return string representation"""
        return self._data.__repr__() + ' (' + str(len(self._data))+' items)'
Run Code Online (Sandbox Code Playgroud)


Sma*_*ron 6

好吧使用deque类,但对于问题的重新解释(平均值),这是我的解决方案:

>>> from collections import deque
>>> class CircularBuffer(deque):
...     def __init__(self, size=0):
...             super(CircularBuffer, self).__init__(maxlen=size)
...     @property
...     def average(self):  # TODO: Make type check for integer or floats
...             return sum(self)/len(self)
...
>>>
>>> cb = CircularBuffer(size=10)
>>> for i in range(20):
...     cb.append(i)
...     print "@%s, Average: %s" % (cb, cb.average)
...
@deque([0], maxlen=10), Average: 0
@deque([0, 1], maxlen=10), Average: 0
@deque([0, 1, 2], maxlen=10), Average: 1
@deque([0, 1, 2, 3], maxlen=10), Average: 1
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14
Run Code Online (Sandbox Code Playgroud)

  • 我保证deque不会在内部使用numpy数组.`collections`是标准库的一部分,`numpy`不是.对第三方库的依赖会使得标准库变得糟糕. (15认同)
  • 对于我的帖子?,代码在任何地方都不使用numpy.¿? (4认同)

djv*_*jvg 5

尽管这里已经有很多不错的答案,但是我找不到所提及选项的任何时间直接比较。因此,请在下面的比较中找到我的谦虚尝试。

仅出于测试目的,该类可以在list基于缓冲区,collections.deque基于缓冲区和Numpy.roll基于缓冲区之间切换。

请注意,该update方法一次只添加一个值,以保持简单。

import numpy
import timeit
import collections


class CircularBuffer(object):
    buffer_methods = ('list', 'deque', 'roll')

    def __init__(self, buffer_size, buffer_method):
        self.content = None
        self.size = buffer_size
        self.method = buffer_method

    def update(self, scalar):
        if self.method == self.buffer_methods[0]:
            # Use list
            try:
                self.content.append(scalar)
                self.content.pop(0)
            except AttributeError:
                self.content = [0.] * self.size
        elif self.method == self.buffer_methods[1]:
            # Use collections.deque
            try:
                self.content.append(scalar)
            except AttributeError:
                self.content = collections.deque([0.] * self.size,
                                                 maxlen=self.size)
        elif self.method == self.buffer_methods[2]:
            # Use Numpy.roll
            try:
                self.content = numpy.roll(self.content, -1)
                self.content[-1] = scalar
            except IndexError:
                self.content = numpy.zeros(self.size, dtype=float)

# Testing and Timing
circular_buffer_size = 100
circular_buffers = [CircularBuffer(buffer_size=circular_buffer_size,
                                   buffer_method=method)
                    for method in CircularBuffer.buffer_methods]
timeit_iterations = 1e4
timeit_setup = 'from __main__ import circular_buffers'
timeit_results = []
for i, cb in enumerate(circular_buffers):
    # We add a convenient number of convenient values (see equality test below)
    code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
        i, circular_buffer_size)
    # Testing
    eval(code)
    buffer_content = [item for item in cb.content]
    assert buffer_content == range(circular_buffer_size)
    # Timing
    timeit_results.append(
        timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations)))
    print '{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
        cb.method, timeit_results[-1],
        timeit_results[-1] / timeit_iterations * 1e3)
Run Code Online (Sandbox Code Playgroud)

在我的系统上会产生:

list:  total 1.06s (0.11ms per iteration)
deque: total 0.87s (0.09ms per iteration)
roll:  total 6.27s (0.63ms per iteration)
Run Code Online (Sandbox Code Playgroud)