主动删除旧消息的字典+队列数据结构

dav*_*off 6 python queue data-structures

我想创建一个数据结构来表示一组队列(理想情况下是哈希、映射或类似查找的字典),其中队列中的消息在达到一定年龄后将被主动删除。ttl 值将是全局的;消息不需要也没有单独的 ttl。ttl 的分辨率不需要非常准确 - 只需在一秒左右即可。

我什至不知道要在这里搜索什么。我可以创建一个单独的全局队列,后台线程正在监视该队列,从全局队列中查看和拉出指向消息的指针,告诉它从各个队列中删除项目,但行为需要双向进行。如果某个项目从单独队列中删除,则需要从全局队列中删除。

我希望这个数据结构能够在 Python 中实现,理想情况下,并且一如既往,速度是最重要的(比内存使用更重要)。对于从哪里开始有什么建议吗?

sam*_*ias 4

我首先在单个类中对您正在寻找的行为进行建模,并尽可能简单地表达。稍后可以通过迭代优化来提高性能,但前提是有必要(您可能不需要)。

下面的课程所做的事情与您所描述的大致相似。队列只是命名并存储在字典中的列表。每条消息都带有时间戳并插入到列表的前面 (FIFO)。通过检查列表末尾的消息时间戳并将其弹出,直到遇到低于年龄阈值的消息来获取消息。

如果您计划从多个线程访问它,则需要添加一些细粒度的锁定以充分发挥其性能。例如,该reap()方法一次应该只锁定 1 个队列,而不是锁定所有队列(方法级同步),因此您还需要为每个命名队列保留一个锁。

更新- 现在使用一组全局存储桶(按时间戳,1 秒分辨率)来跟踪哪些队列包含该时间的消息。这减少了每次传递时要检查的队列数量。

import time
from collections import defaultdict

class QueueMap(object):

    def __init__(self):
        self._expire = defaultdict(lambda *n: defaultdict(int))
        self._store = defaultdict(list)
        self._oldest_key = int(time.time())

    def get_queue(self, name):
        return self._store.get(name, [])

    def pop(self, name):
        queue = self.get_queue(name)
        if queue:
            key, msg = queue.pop()
            self._expire[key][name] -= 1
            return msg
        return None

    def set(self, name, message):
        key = int(time.time())
        # increment count of messages in this bucket/queue
        self._expire[key][name] += 1
        self._store[name].insert(0, (key, message))

    def reap(self, age):
        now = time.time()
        threshold = int(now - age)
        oldest = self._oldest_key

        # iterate over buckets we need to check
        for key in range(oldest, threshold + 1):
            # for each queue with items, expire the oldest ones
            for name, count in self._expire[key].iteritems():
                if count <= 0:
                    continue

                queue = self.get_queue(name)
                while queue:
                    if queue[-1][0] > threshold:
                        break
                    queue.pop()
            del self._expire[key]

        # set oldest_key for next pass
        self._oldest_key = threshold
Run Code Online (Sandbox Code Playgroud)

用法:

qm = QueueMap()
qm.set('one', 'message 1')
qm.set('one', 'message 2')
qm.set('two', 'message 3')
print qm.pop('one')
print qm.get_queue('one')
print qm.get_queue('two')

# call this on a background thread which sleeps
time.sleep(2)
# reap messages older than 1 second
qm.reap(1)
# queues should be empty now
print qm.get_queue('one')
print qm.get_queue('two')
Run Code Online (Sandbox Code Playgroud)