芹菜和transaction.atomic

use*_*461 14 django postgresql transactions celery

在一些Django视图中,我使用这样的模式来保存对模型的更改,然后根据新的模型数据进行一些异步更新(例如生成图像,进一步更改模型). mytask是一个芹菜任务:

with transaction.atomic():
    mymodel.save()
    mytask.delay(mymodel.id).get()
Run Code Online (Sandbox Code Playgroud)

问题是任务永远不会返回.查看芹菜的日志,任务排队(我在日志中看到"已接收的任务"),但它永远不会完成.如果我将mytask.delay...get呼叫移出事务,则会成功完成.

transaction.atomic芹菜和芹菜之间是否有一些不相容之处?在Django 1.6或1.7中,我可以在一次交易中同时进行常规模型更新和单独任务流程的更新吗?

我的数据库是postgresql 9.1.我正在使用celery == 3.1.16/django-celery 3.1.16,amqp == 1.4.6,Django == 1.6.7,kombu == 3.0.23.代理后端是amqp,rabitmq是队列.

Joz*_*zef 24

正如@dotz所提到的,产生异步任务并立即阻塞并一直等到它完成后几乎没有用.

此外,如果您以这种方式(.get()最后)附加到它,您可以确保mymodel您的工作人员不会看到刚刚发出的实例更改因为它们还没有被提交 - 请记住您仍然在atomic块.

你可以做什么(来自Django 1.9)是延迟任务,直到提交当前活动事务后,使用django.db.transaction.on_commithook:

from django.db import transaction

with transaction.atomic():
    mymodel.save()
    transaction.on_commit(lambda:
        mytask.delay(mymodel.id))
Run Code Online (Sandbox Code Playgroud)

我经常在我的post_save信号处理程序中使用这种模式,触发一些新模型实例的处理.例如:

from django.db import transaction
from django.db.models.signals import post_save
from django.dispatch import receiver
from . import models   # Your models defining some Order model
from . import tasks   # Your tasks defining a routine to process new instances

@receiver(post_save, sender=models.Order)
def new_order_callback(sender, instance, created, **kwargs):
    """ Automatically triggers processing of a new Order. """
    if created:
        transaction.on_commit(lambda:
            tasks.process_new_order.delay(instance.pk))
Run Code Online (Sandbox Code Playgroud)

但是,这样,如果数据库事务失败,则不会执行任务.这通常是理想的行为,但请记住.

编辑:以这种方式注册on_commit芹菜任务实际上更好(没有lambda):

transaction.on_commit(tasks.process_new_order.s(instance.pk).delay)
Run Code Online (Sandbox Code Playgroud)


dot*_*otz 7

"独立任务"=由工人运行的东西.

"芹菜工人"=另一个过程.

我不知道任何方法可以让您在2个或更多个进程之间共享一个数据库事务.你想要的是在同一个事务中以同步的方式运行任务,然后等待结果...但是,如果这就是你想要的,为什么你还需要一个任务队列呢?