Tar*_*zan 2 transactions celery pyramid
在这里我的情况.
我正在构建一个RESTful Web服务,从客户端接收数据,然后从这些数据创建一个事件,然后我想将这个新事件推送到芹菜中以异步处理它.
我使用金字塔构建RESTful webservice,使用pyramid_celery使金字塔和芹菜一起工作.
这是我的观点的源代码:
# views.py
# This code recive data from client, then create a new record Event from this
posted_data = schema.deserialize(request.POST.mixed())
e = Event()
e.__dict__.update(posted_data)
DBSession.add(e)
transaction.commit()
print "Commited #%d" % e.id # code mark 01
fire_event.delay(e.id) # fire_event is a celery task
logging.getLogger(__name__).info('Add event #%d to tasks' % e.id)
Run Code Online (Sandbox Code Playgroud)
这是我的任务的源代码:
# tasks.py
@celery.task()
def fire_event(event_id):
e = DBSession.query(Event).get(event_id)
if e is None:
return
print "Firing event %d#%s" % (event_id, e)
logger.info("Firing event %d#%s", event_id, e)
Run Code Online (Sandbox Code Playgroud)
如果我使用金字塔的炼金术脚手架中的默认代码,则会在代码标记01行处引发异常.像这样的例外:
DetachedInstanceError: Instance <Event at ...> is not bound to a Session; ...
Run Code Online (Sandbox Code Playgroud)
从ZopeAlchemy文档中,为了避免这个异常,我配置了DBSession:
# models.py
DBSession = scoped_session(sessionmaker(
extension=ZopeTransactionExtension(keep_session=True)
))
Run Code Online (Sandbox Code Playgroud)
现在我的问题是我的RESTful请求完成后,我的MySQL服务器的金字塔保持事务.当RESTful请求完成后,我转到MySQL Server并运行命令:
SHOW engine innodb status;
Run Code Online (Sandbox Code Playgroud)
从结果来看,我看到了这个:
--TRANSACTION 180692, ACTIVE 84 sec
MySQL thread id 94, OS thread handle 0x14dc, query id 1219 [domain] [ip] [project name] cleaning up
Trx read view will not see trx with id >= 180693, sees < 180693
Run Code Online (Sandbox Code Playgroud)
这意味着金字塔仍然保持连接,没关系,但金字塔也开始交易,这是一个问题.当我尝试使用其他工具访问我的MySQL服务器时,此事务可以使我处于锁定状态.
我的问题是:
一旦RESTful请求完成,我怎样才能让Pyramid关闭事务.如果我不能,那么我的情况会有另一种解决方案吗?
非常感谢.
Celery保持一种"透明地"运行代码作为任务的错觉 - 你用你的函数装饰你的函数@task,然后使用my_function.delay(),一切都神奇地起作用.
事实上,实现起来有点棘手,你的代码在一个完全不同的进程中运行,可能在另一台机器上运行,可能是几分钟/小时后,并且该进程中不存在Pyramid请求/响应周期,因此ZopeTransactionExtension不能用于在请求完成时自动在工作进程中提交事务 - 因为没有请求,只有一个长时间运行的工作进程.
因此,金字塔不会将未完成的交易挂起 - 这是您的工作流程.当您调用e = DBSession.query(Event).get(event_id)并且永远不会完成时,事务由SQLAlchemy启动.
在这里,我写了一个类似问题的更长答案,其中包含更多细节:https://stackoverflow.com/a/16346587/320021 - 重点是为工作进程使用不同的会话
另一件事是,最好避免transaction.commit()在你的金字塔代码中使用,因为对象过期和其他丑陋.在金字塔中,可以在请求完成后调用一个函数 - 我写了一个函数,它注册一个从那里调用芹菜任务的回调:
from repoze.tm import after_end
import transaction
def invoke_task_after_commit(task_fn, task_args, task_kwargs):
"""
This should ONLY be used within the web-application process managed by repoze.tm2
otherwise a memory leak will result. See http://docs.repoze.org/tm2/#cleanup
for more details.
"""
t = transaction.get() # the current transaction
def invoke():
task_fn.apply_async(
args=task_args,
kwargs=task_kwargs,
)
after_end.register(invoke, t)
Run Code Online (Sandbox Code Playgroud)
(我从函数中删除了很多不相关的代码,因此可能存在拼写错误等.视为伪代码)
| 归档时间: |
|
| 查看次数: |
1100 次 |
| 最近记录: |