Jam*_*mes 10 python concurrency persistence memoization file-locking
我有一个昂贵的函数,它接收并返回少量数据(一些整数和浮点数).我已经记住了这个功能,但我想让备忘录持久化.已经有几个与此相关的线程,但我不确定某些建议方法的潜在问题,我有一些相当具体的要求:
multiprocessing和来自不同的python脚本)该主题讨论了该shelve模块,该模块显然不是过程安全的.其中两个答案建议使用fcntl.flock锁定搁置文件.然而,这个帖子中的一些回答似乎表明这充满了问题 - 但我不确定它们是什么.这听起来好像只限于Unix(虽然显然Windows有一个等效的称为msvcrt.locking),而锁只是'建议' - 即它不会阻止我在不检查它被锁定的情况下意外写入文件.还有其他潜在的问题吗?写入文件的副本,并将主副本替换为最后一步,是否会降低腐败风险?
看起来dbm模块看起来不比搁置更好.我已经快速浏览了sqlite3,但为此目的看起来有点过分. 这个帖子和这个提到了几个第三方库,包括ZODB,但是有很多选择,而且它们对于这个任务来说都显得过于庞大和复杂.
有人有建议吗?
更新:有点提到下面的IncPy,看起来非常有趣.不幸的是,我不想回到Python 2.6(我实际上使用3.2),看起来使用C库有点尴尬(我大量使用numpy和scipy等).
kindall的另一个想法是有启发性的,但我认为将其调整为多个进程会有点困难 - 我想用文件锁定或数据库替换队列是最容易的.
再次看ZODB,它确实看起来非常适合这项任务,但我确实希望避免使用任何其他库.我还不完全确定简单使用的所有问题是什么flock- 我想象一个大问题是如果一个进程在写入文件时终止,或者在释放锁之前?
所以,我已经采用了synthesizerpatel的建议并且使用了sqlite3.如果有人感兴趣的话,我决定直接替换dict它将其条目存储为数据库中的泡菜(我不打算留在内存中,因为数据库访问和酸洗比我正在做的其他事情都要快).我确信有更有效的方法可以做到这一点(我不知道我是否仍然会遇到并发问题),但这里是代码:
from collections import MutableMapping
import sqlite3
import pickle
class PersistentDict(MutableMapping):
def __init__(self, dbpath, iterable=None, **kwargs):
self.dbpath = dbpath
with self.get_connection() as connection:
cursor = connection.cursor()
cursor.execute(
'create table if not exists memo '
'(key blob primary key not null, value blob not null)'
)
if iterable is not None:
self.update(iterable)
self.update(kwargs)
def encode(self, obj):
return pickle.dumps(obj)
def decode(self, blob):
return pickle.loads(blob)
def get_connection(self):
return sqlite3.connect(self.dbpath)
def __getitem__(self, key):
key = self.encode(key)
with self.get_connection() as connection:
cursor = connection.cursor()
cursor.execute(
'select value from memo where key=?',
(key,)
)
value = cursor.fetchone()
if value is None:
raise KeyError(key)
return self.decode(value[0])
def __setitem__(self, key, value):
key = self.encode(key)
value = self.encode(value)
with self.get_connection() as connection:
cursor = connection.cursor()
cursor.execute(
'insert or replace into memo values (?, ?)',
(key, value)
)
def __delitem__(self, key):
key = self.encode(key)
with self.get_connection() as connection:
cursor = connection.cursor()
cursor.execute(
'select count(*) from memo where key=?',
(key,)
)
if cursor.fetchone()[0] == 0:
raise KeyError(key)
cursor.execute(
'delete from memo where key=?',
(key,)
)
def __iter__(self):
with self.get_connection() as connection:
cursor = connection.cursor()
cursor.execute(
'select key from memo'
)
records = cursor.fetchall()
for r in records:
yield self.decode(r[0])
def __len__(self):
with self.get_connection() as connection:
cursor = connection.cursor()
cursor.execute(
'select count(*) from memo'
)
return cursor.fetchone()[0]
Run Code Online (Sandbox Code Playgroud)
我假设你想继续在RAM中记忆函数的结果,可能在字典中,但是使用持久性来减少应用程序的"预热"时间.在这种情况下,您不会直接在后备存储中随机访问项目,因此数据库可能确实有点过分(尽管作为合成器配件,可能没有您想象的那么多).
尽管如此,如果你想自己动手,一个可行的策略可能是在开始任何线程之前在运行开始时从文件中加载字典.当结果不在字典中时,您需要在将其添加到字典后将其写入文件.您可以通过将其添加到队列并使用单个工作线程将队列中的项刷新到磁盘(只需将它们附加到单个文件就可以了)来完成此操作.您可能偶尔会多次添加相同的结果,但这不是致命的,因为每次都会产生相同的结果,因此将其读回两次或更多次将不会造成真正的伤害.Python的线程模型将使您远离大多数类型的并发问题(例如,附加到列表是原子的).
这是一些(未经测试的,通用的,不完整的)代码,显示了我在说什么:
import cPickle as pickle
import time, os.path
cache = {}
queue = []
# run at script start to warm up cache
def preload_cache(filename):
if os.path.isfile(filename):
with open(filename, "rb") as f:
while True:
try:
key, value = pickle.load(f), pickle.load(f)
except EOFError:
break
cache[key] = value
# your memoized function
def time_consuming_function(a, b, c, d):
key = (a, b, c, d)
if key in cache:
return cache[key]
else:
# generate the result here
# ...
# add to cache, checking to see if it's already there again to avoid writing
# it twice (in case another thread also added it) (this is not fatal, though)
if key not in cache:
cache[key] = result
queue.append((key, result))
return result
# run on worker thread to write new items out
def write_cache(filename):
with open(filename, "ab") as f:
while True:
while queue:
key, value = queue.pop() # item order not important
# but must write key and value in single call to ensure
# both get written (otherwise, interrupting script might
# leave only one written, corrupting the file)
f.write(pickle.dumps(key, pickle.HIGHEST_PROTOCOL) +
pickle.dumps(value, pickle.HIGHEST_PROTOCOL))
f.flush()
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)
如果我有时间,我会把它变成装饰器......并将持久性放入dict子类......全局变量的使用也是次优的.:-)如果你使用这种方法,multiprocessing你可能想要使用multiprocessing.Queue而不是列表; 然后,您可以使用queue.get()阻塞等待写入文件的工作进程中的新结果.multiprocessing但是,我没有用过,所以请一点点建议.
| 归档时间: |
|
| 查看次数: |
2269 次 |
| 最近记录: |