jma*_*agh 2 python transactions sqlalchemy pyramid
我正在尝试构建一个Pyramid应用程序.我从SQLAlchemy脚手架开始.我遇到了一个问题,我想知道解决它的最佳方法是什么.在我的一个视图中,我需要从两个不相关的表中选择很多行.我需要确保在从第一个表中选择行和从第二个表中选择行的时间之间没有插入第二个表的行.
我有三个型号,Node,Test,和Tasking.双方Nodes并Tests有相当多的元数据.给定列表Nodes和Tests列表,Taskings可以创建全局列表.例如,我们可以有三个Nodes,a,b,和c两个Tests"我们需要一个节点都做任务P"和"我们需要两个节点做任务Q".
从这些信息中,Tasks应该创建三个.例如:
a应该做任务P"b应该做任务Q"c应该做任务Q"现在,我正在尝试为此提供REST API.绝大多数时间客户端都会请求列表Tasks,因此需要快速.但是,有时客户端可能会添加一个Node或一个Test.当发生这种情况时,我需要Tasks重新生成整个列表.
这是一个粗略的例子:
@view_config(route_name='list_taskings')
def list_taskings(request):
return DBSession.Query(Tasking).all()
@view_config(route_name='add_node')
def add_node(request):
DBSession.add(Node())
_update_taskings()
@view_config(route_name='add_test')
def add_test(request):
DBSession.add(Test())
_update_taskings()
def _update_taskings():
nodes = DBSession.query(Node).all()
tests = DBSession.query(Test).all()
# Process...
Tasking.query.delete()
for t in taskings:
DBSession.add(t)
Run Code Online (Sandbox Code Playgroud)
我正在使用默认的Pyramid SQLAlchemy脚手架.因此,每个请求都会自动启动一个事务.因此,如果_update_tasking从一个请求(例如add_node)调用,那么新节点将被添加到本地DBSession,并且查询all Nodes和Testsin _update_tasking将返回该新元素.此外,删除所有现有的Taskings并添加新计算的也是安全的.
我有两个问题:
如果一个新行被添加到会发生什么事Tests的时候,我让我的列表之间的表nodes和我的名单tests中_update_taskings?在我的真实世界生产系统中,这些选择是紧密相连但不是紧挨着彼此.有可能出现竞争条件.
如何确保两个更新请求的请求Taskings不会相互覆盖?例如,假设我们现有的系统有一个Node和一个Test.两个请求同时进入,一个添加一个Node,一个添加一个Test.即使问题#1不是问题,我知道每个请求的选择对表示"数据库中的单个时间实例",但仍然存在一个请求覆盖另一个请求的问题.如果第一个请求首先使用现在的两个Nodes和一个请求完成Test,则第二个请求仍将选择旧数据(可能),并将生成Taskings一个Node和两个的列表Tests.
那么,处理这个问题的最佳方法是什么?我在生产中使用SQLite进行开发和PostgreSQL,但我想要一个与数据库无关的解决方案.我并不担心其他应用程序访问此数据库.我的REST API将是唯一的访问机制.我是否应该锁定任何改变数据库的请求(添加a Node或a Test)?我应该以某种方式锁定数据库吗?
谢谢你的帮助!
使用可序列化的事务隔离级别应防止这两个问题.如果一个事务修改了可能影响另一个事务中先前读取结果的数据,则会发生序列化冲突.只有一个事务获胜,所有其他事务都被数据库中止以由客户端重新启动.SQLite通过锁定整个数据库来实现这一点,PostgreSQL采用了更为复杂的机制(详见文档).不幸的是,没有可移植的sqlalchemic方法来捕获序列化异常并重试.您需要编写特定于DB的代码,以便可靠地将其与其他错误区分开来.
我已经提出了一个示例程序,其中有两个线程同时修改数据(一个非常基本的方案再现),遇到冲突并重试:
https://gist.github.com/khayrov/6291557
使用Pyramid交易中间件和Zope事务管理器会更容易.捕获序列化错误后,不是手动重试,加注TransientError和中间件将重试整个请求tm.attempts(在paster配置中).
from transaction.interfaces import TransientError
class SerializationConflictError(TransientError):
def __init__(self, orig):
self.orig = orig
Run Code Online (Sandbox Code Playgroud)
您甚至可以pyramid_tm在堆栈中编写自己的中间件来捕获序列化错误,并将它们透明地转换为瞬态错误.
def retry_serializable_tween_factory(handler, registry):
def retry_tween(request):
try:
return handler(request)
except DBAPIError, e:
orig = e.orig
if getattr(orig, 'pgcode', None) == '40001':
raise SerializationConflictError(e)
elif isinstance(orig, sqlite3.DatabaseError) and \
orig.args == ('database is locked',):
raise SerializationConflictError(e)
else:
raise
return retry_tween
Run Code Online (Sandbox Code Playgroud)