Ran*_*dra 3 postgresql sqlalchemy celery pyramid
这就是我的代码
import transaction
@app.task(name='task_name')
def task_name_fn(*args, **kwargs):
with transaction.manager:
try:
actual_fn(*args, **kwargs)
transaction.commit()
except:
transaction.abort()
Run Code Online (Sandbox Code Playgroud)
但是,我transaction.abort()似乎并没有退缩。此工人上的所有后续芹菜任务均失败。我收到以下错误
由于刷新期间先前存在异常,因此该会话的事务已回滚。要开始与此Session进行新事务,请首先发出Session.rollback()。
我究竟做错了什么?
更好的问题是,您将如何编写task_name_fn以便不会发生此问题?
第一件事是,您无需捕获异常即可中止事务。
import transaction
@app.task(name='task_name')
def task_name_fn(*args, **kwargs):
with transaction.manager:
actual_fn(*args, **kwargs)
Run Code Online (Sandbox Code Playgroud)
如果发生异常,事务将中止。
接下来,您可以在任务装饰器中对其进行抽象。这样的事情(未经测试,但可能会按原样工作):
from functools import wraps
import transaction
def tm_task(f):
@wraps(f)
def decorated(*args, **kwargs):
with transaction.manager:
return f(*args, **kwargs)
return app.task()(decorated)
@tm_task
def actual_fn(*args, **kwargs):
pass # your function code here instead of calling other function
Run Code Online (Sandbox Code Playgroud)
另外,由于您正在使用事务,因此您可能希望在事务提交后延迟作业的排队。因为,例如,如果您在事务中插入一行并排队等待该行执行某项工作,则它可能在提交第一个事务之前到达工作器中,并且该行在事务外部尚不可用。就像是:
class AfterCommitTask(Task):
def apply_async(self, *args, **kw):
tx = transaction.get()
def hook(status):
if status: # Only queue if the transaction was succesfull.
super(AfterCommitTask, self).apply_async(*args, **kw)
tx.addAfterCommitHook(hook)
def tm_task(f):
@wraps(f)
def decorated(*args, **kwargs):
with transaction.manager:
return f(*args, **kwargs)
return app.task(base=AfterCommitTask)(decorated)
@tm_task
def actual_fn(*args, **kwargs):
pass # your function code here instead of calling other function
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
516 次 |
| 最近记录: |