我有多个线程运行相同的进程,需要能够相互通知在接下来的n秒内不应该处理某些事情,如果他们这样做的话,它不是世界末日.
我的目标是能够将字符串和TTL传递给缓存,并能够将缓存中的所有字符串作为列表获取.缓存可以存储在内存中,TTL不会超过20秒.
有没有人对如何实现这一点有任何建议?
Use*_*ser 26
OP正在使用python 2.7,但是如果你使用的是python 3,ExpiringDict
那么接受的答案中提到的答案目前已经过期了.对github仓库的最后一次提交是在2017年6月17日,并且有一个未解决的问题,它不适用于Python 3.5
最近维护的项目缓存工具(最后提交2018年6月14日)
pip install cachetools
from cachetools import TTLCache
cache = TTLCache(maxsize=10, ttl=360)
cache['apple'] = 'top dog'
...
>>> cache['apple']
'top dog'
... after 360 seconds...
>>> cache['apple']
KeyError exception thrown
Run Code Online (Sandbox Code Playgroud)
ttl
是几秒钟的生活时间.
enr*_*cis 23
您可以使用该expiringdict
模块:
该库的核心是
ExpiringDict
类,它是一个有序字典,具有用于缓存目的的自动过期值.
在描述中他们不谈多线程,所以为了不搞乱,使用一个Lock
.
iut*_*nvg 16
如果您不想使用任何第3个库,则可以在昂贵的函数中再添加一个参数:ttl_hash=None
。此新参数称为“时间敏感哈希”,其唯一目的是影响lru_cache
。
例如:
from functools import lru_cache
import time
@lru_cache()
def my_expensive_function(a, b, ttl_hash=None):
del ttl_hash # to emphasize we don't use it and to shut pylint up
return a + b # horrible CPU load...
def get_ttl_hash(seconds=3600):
"""Return the same value withing `seconds` time period"""
return round(time.time() / seconds)
# somewhere in your code...
res = my_expensive_function(2, 2, ttl_hash=get_ttl_hash())
# cache will be updated once in an hour
Run Code Online (Sandbox Code Playgroud)
Gra*_*ntJ 15
怎么运行的?
@functools.lru_cache
的支持进行缓存。maxsize
typed
Result
对象使用 记录函数的返回值和“死亡”时间time.monotonic() + ttl
。time.monotonic()
,如果当前时间超过“死亡”时间,则使用新的“死亡”时间重新计算返回值。显示代码:
from functools import lru_cache, wraps
from time import monotonic
def lru_cache_with_ttl(maxsize=128, typed=False, ttl=60):
"""Least-recently used cache with time-to-live (ttl) limit."""
class Result:
__slots__ = ('value', 'death')
def __init__(self, value, death):
self.value = value
self.death = death
def decorator(func):
@lru_cache(maxsize=maxsize, typed=typed)
def cached_func(*args, **kwargs):
value = func(*args, **kwargs)
death = monotonic() + ttl
return Result(value, death)
@wraps(func)
def wrapper(*args, **kwargs):
result = cached_func(*args, **kwargs)
if result.death < monotonic():
result.value = func(*args, **kwargs)
result.death = monotonic() + ttl
return result.value
wrapper.cache_clear = cached_func.cache_clear
return wrapper
return decorator
Run Code Online (Sandbox Code Playgroud)
如何使用它?
# Recalculate cached results after 5 seconds.
@lru_cache_with_ttl(ttl=5)
def expensive_function(a, b):
return a + b
Run Code Online (Sandbox Code Playgroud)
ttl=10
所有调用站点都不需要烦人的参数。(*args, **kwargs)
。接受的答案失败#2、#3、#4、#5 和#6。
不会主动驱逐过期的物品。仅当缓存达到最大大小时,才会驱逐过期的项目。如果缓存未达到最大大小(假设 maxsize 为None
),则不会发生驱逐。
(*args, **kwargs)
但是,对于缓存函数的每个唯一键/值对,仅存储一个键/值对。因此,如果只有 10 个不同的参数组合,那么缓存最多只有 10 个条目。
请注意,“时间敏感哈希”和“时间盐”解决方案要糟糕得多,因为具有相同键(但不同时间哈希/盐)的多个键/值缓存项留在缓存中。
Jav*_*zzi 13
我非常喜欢@iutinvg 的想法,我只是想更进一步;将它与必须知道将 传递给ttl
每个函数并使其成为装饰器的过程分离,这样您就不必考虑它了。如果您有django
, py3
, 并且不想 pip 安装任何依赖项,请尝试一下。
import time
from django.utils.functional import lazy
from functools import lru_cache, partial, update_wrapper
def lru_cache_time(seconds, maxsize=None):
"""
Adds time aware caching to lru_cache
"""
def wrapper(func):
# Lazy function that makes sure the lru_cache() invalidate after X secs
ttl_hash = lazy(lambda: round(time.time() / seconds), int)()
@lru_cache(maxsize)
def time_aware(__ttl, *args, **kwargs):
"""
Main wrapper, note that the first argument ttl is not passed down.
This is because no function should bother to know this that
this is here.
"""
def wrapping(*args, **kwargs):
return func(*args, **kwargs)
return wrapping(*args, **kwargs)
return update_wrapper(partial(time_aware, ttl_hash), func)
return wrapper
Run Code Online (Sandbox Code Playgroud)
证明它有效(用例子):
@lru_cache_time(seconds=10)
def meaning_of_life():
"""
This message should show up if you call help().
"""
print('this better only show up once!')
return 42
@lru_cache_time(seconds=10)
def multiply(a, b):
"""
This message should show up if you call help().
"""
print('this better only show up once!')
return a * b
# This is a test, prints a `.` for every second, there should be 10s
# between each "this better only show up once!" *2 because of the two functions.
for _ in range(20):
meaning_of_life()
multiply(50, 99991)
print('.')
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)
Acu*_*nus 12
对于即将到期的内存高速缓存,对于一般用途,通常不通过字典而是通过函数或方法装饰器来执行此操作的常见设计模式。缓存字典在后台进行管理。这样,此答案在某种程度上补充了使用字典而不是修饰符的用户的答案。
将ttl_cache
在装饰cachetools==3.1.0
作品很像functools.lru_cache
,但有生存时间。
import cachetools.func
@cachetools.func.ttl_cache(maxsize=128, ttl=10 * 60)
def example_function(key):
return get_expensively_computed_value(key)
class ExampleClass:
EXP = 2
@classmethod
@cachetools.func.ttl_cache()
def example_classmethod(cls, i):
return i * cls.EXP
@staticmethod
@cachetools.func.ttl_cache()
def example_staticmethod(i):
return i * 3
Run Code Online (Sandbox Code Playgroud)
如果您想避免使用第三方包,可以添加自定义timed_lru_cache 装饰器,该装饰器基于 lru_cache 装饰器构建。
下面的默认生存期为 20 秒,最大大小为 128。请注意,整个缓存在 20 秒后过期,而不是单个项目。
from datetime import datetime, timedelta
from functools import lru_cache, wraps
def timed_lru_cache(seconds: int = 20, maxsize: int = 128):
def wrapper_cache(func):
func = lru_cache(maxsize=maxsize)(func)
func.lifetime = timedelta(seconds=seconds)
func.expiration = datetime.utcnow() + func.lifetime
@wraps(func)
def wrapped_func(*args, **kwargs):
if datetime.utcnow() >= func.expiration:
func.cache_clear()
func.expiration = datetime.utcnow() + func.lifetime
return func(*args, **kwargs)
return wrapped_func
return wrapper_cache
Run Code Online (Sandbox Code Playgroud)
然后,只需@timed_lru_cache()
在您的函数上方添加即可:
@timed_lru_cache()
def my_function():
# code goes here...
Run Code Online (Sandbox Code Playgroud)
我知道这有点旧,但是对于那些对没有第三方依赖项感兴趣的人来说,这是一个围绕内置函数的小包装器functools.lru_cache
(我在写这篇文章后注意到哈维尔的类似答案,但我想我还是发布了它,因为这不会需要Django):
import functools
import time
def time_cache(max_age, maxsize=128, typed=False):
"""Least-recently-used cache decorator with time-based cache invalidation.
Args:
max_age: Time to live for cached results (in seconds).
maxsize: Maximum cache size (see `functools.lru_cache`).
typed: Cache on distinct input types (see `functools.lru_cache`).
"""
def _decorator(fn):
@functools.lru_cache(maxsize=maxsize, typed=typed)
def _new(*args, __time_salt, **kwargs):
return fn(*args, **kwargs)
@functools.wraps(fn)
def _wrapped(*args, **kwargs):
return _new(*args, **kwargs, __time_salt=int(time.time() / max_age))
return _wrapped
return _decorator
Run Code Online (Sandbox Code Playgroud)
及其用法:
@time_cache(10)
def expensive(a: int):
"""An expensive function."""
time.sleep(1 + a)
print("Starting...")
expensive(1)
print("Again...")
expensive(1)
print("Done")
Run Code Online (Sandbox Code Playgroud)
注意这使用time.time
并带有所有警告。time.monotonic
如果可用/合适,您可能想改用它。
小智 7
我真的很喜欢@iutinvg 解决方案,因为它很简单。但是,我不想在每个需要缓存的函数中添加额外的参数。受到刘易斯和哈维尔回答的启发,我认为装饰器是最好的。然而,我不想使用第三方库(如哈维尔),我认为我可以改进刘易斯解决方案。这就是我想出的办法。
import time
from functools import lru_cache
def ttl_lru_cache(seconds_to_live: int, maxsize: int = 128):
"""
Time aware lru caching
"""
def wrapper(func):
@lru_cache(maxsize)
def inner(__ttl, *args, **kwargs):
# Note that __ttl is not passed down to func,
# as it's only used to trigger cache miss after some time
return func(*args, **kwargs)
return lambda *args, **kwargs: inner(time.time() // seconds_to_live, *args, **kwargs)
return wrapper
Run Code Online (Sandbox Code Playgroud)
我的解决方案使用 lambda 来获得更少的代码行和整数除法 ( //
),因此不需要强制转换为 int。
用法
@ttl_lru_cache(seconds_to_live=10)
def expensive(a: int):
"""An expensive function."""
time.sleep(1 + a)
print("Starting...")
expensive(1)
print("Again...")
expensive(1)
print("Done")
Run Code Online (Sandbox Code Playgroud)
注意:对于这些装饰器,您永远不应该设置maxsize=None
,因为随着时间的推移,缓存会增长到无穷大。
类似的东西?
from time import time, sleep
import itertools
from threading import Thread, RLock
import signal
class CacheEntry():
def __init__(self, string, ttl=20):
self.string = string
self.expires_at = time() + ttl
self._expired = False
def expired(self):
if self._expired is False:
return (self.expires_at < time())
else:
return self._expired
class CacheList():
def __init__(self):
self.entries = []
self.lock = RLock()
def add_entry(self, string, ttl=20):
with self.lock:
self.entries.append(CacheEntry(string, ttl))
def read_entries(self):
with self.lock:
self.entries = list(itertools.dropwhile(lambda x:x.expired(), self.entries))
return self.entries
def read_entries(name, slp, cachelist):
while True:
print "{}: {}".format(name, ",".join(map(lambda x:x.string, cachelist.read_entries())))
sleep(slp)
def add_entries(name, ttl, cachelist):
s = 'A'
while True:
cachelist.add_entry(s, ttl)
print("Added ({}): {}".format(name, s))
sleep(1)
s += 'A'
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal.SIG_DFL)
cl = CacheList()
print_threads = []
print_threads.append(Thread(None, read_entries, args=('t1', 1, cl)))
# print_threads.append(Thread(None, read_entries, args=('t2', 2, cl)))
# print_threads.append(Thread(None, read_entries, args=('t3', 3, cl)))
adder_thread = Thread(None, add_entries, args=('a1', 2, cl))
adder_thread.start()
for t in print_threads:
t.start()
for t in print_threads:
t.join()
adder_thread.join()
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
17962 次 |
最近记录: |