我正在尝试建立一个系统,优雅地将数据库操作推迟到一个单独的线程,以避免在Twisted回调期间阻塞.
到目前为止,这是我的方法:
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from twisted.internet.threads import deferToThread
_engine = create_engine(initialization_string)
Session = scoped_session(sessionmaker(bind=_engine))
@contextmanager
def transaction_context():
session = Session()
try:
yield session
session.commit()
except:
# No need to do session.rollback(). session.remove will do it.
raise
finally:
session.remove()
def threaded(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
return deferToThread(fn, *args, **kwargs)
return wrapper
Run Code Online (Sandbox Code Playgroud)
这应该允许我用threaded装饰器包装一个函数,然后transaction_context在所述函数体中使用上下文管理器.以下是一个例子:
from __future__ import print_function
from my_lib.orm import User, transaction_context, threaded
from twisted.internet import reactor
@threaded
def get_n_users(n):
with transaction_context() as session:
return session.query(User).limit(n).all()
if __name__ == '__main__':
get_n_users(n).addBoth(len)
reactor.run()
Run Code Online (Sandbox Code Playgroud)
但是,当我运行上面的脚本时,我得到一个包含以下回溯的失败:
Unhandled error in Deferred:
Unhandled Error
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 781, in __bootstrap
self.__bootstrap_inner()
File "/usr/lib/python2.7/threading.py", line 808, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 761, in run
self.__target(*self.__args, **self.__kwargs)
--- <exception caught here> ---
File "/usr/local/lib/python2.7/dist-packages/twisted/python/threadpool.py", line 191, in _worker
result = context.call(ctx, function, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/twisted/python/context.py", line 118, in callWithContext
return self.currentContext().callWithContext(ctx, func, *args, **kw)
File "/usr/local/lib/python2.7/dist-packages/twisted/python/context.py", line 81, in callWithContext
return func(*args,**kw)
File "testaccess.py", line 9, in get_n_users
return session.query(User).limit(n).all()
File "/usr/lib/python2.7/contextlib.py", line 24, in __exit__
self.gen.next()
File "/home/louis/Documents/Python/knacki/knacki/db.py", line 36, in transaction_context
session.remove()
exceptions.AttributeError: 'Session' object has no attribute 'remove'
Run Code Online (Sandbox Code Playgroud)
我根本没想到这一点.我错过了什么?我没有scoped_session正确地实例化吗?
编辑: 这是一个关于将此设置与Twisted集成的相关问题.它可能有助于澄清我正在努力实现的目标.
Mar*_*ams 13
简短的回答
打电话.remove()的Session,不是session.
答案很长:
scoped_session并没有真正回归一Session堂课.相反,它创建了一个对象,该对象关注它所调用的线程.调用它将返回Session与该线程关联的现有实例或关联新线程并返回该线程.一个线程本地就是一个线程与会话关联.
对象remove上的方法scoped_session删除当前与调用它的线程关联的会话对象.这意味着它是相反的scoped_session.__call__,这是一种令人困惑的API.
这是一个简短的Python脚本来说明行为.
import threading
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
_engine = create_engine('sqlite:///:memory:')
Session = scoped_session(sessionmaker(_engine))
def scoped_session_demo(remove=False):
ids = []
def push_ids():
thread_name = threading.currentThread().getName()
data = [thread_name]
data.append(Session())
if remove:
Session.remove()
data.append(Session())
ids.append(data)
t = threading.Thread(target=push_ids)
t.start()
t.join()
push_ids()
sub_thread, main_thread = ids
sub_name, sub_session_a, sub_session_b = sub_thread
main_name, main_session_a, main_session_b = main_thread
print sub_name, sub_session_a == sub_session_b
print main_name, main_session_a == main_session_b
print sub_name, '==', main_name, sub_session_a == main_session_b
print 'Without remove:'
scoped_session_demo()
print 'With remove:'
scoped_session_demo(True)
Run Code Online (Sandbox Code Playgroud)
它的输出:
Without remove:
Thread-1 True
MainThread True
Thread-1 == MainThread False
With remove:
Thread-2 False
MainThread False
Thread-2 == MainThread False
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
10675 次 |
| 最近记录: |