我有以下代码:
@receiver(post_save, sender=SomeModel, dispatch_uid="build")
def handle_creation(sender, instance, created, **kwargs):
if created == True:
build.delay(instance)
@task()
def build(instance):
instance.status = 'Processing'
instance.save()
#some heavy instructions here
#. . . .
#. . . .
instance.status = 'Finished'
instance.save()
Run Code Online (Sandbox Code Playgroud)
我产生以下错误:
IntegrityError: duplicate key value violates unique constraint DETAIL: Key (id)=(13) already exists.
Run Code Online (Sandbox Code Playgroud)
但如果我先删除instance.save()一切正常。看起来 celery 处理任务时 sql 指令不完整。如何修复它?
谢谢。
您的代码有两个问题。
第一个是最大的,您将对象实例传递给任务 - 这在 celery 文档中被明确标记为错误的方法,基本上您正在做的是将对象序列化为某种状态并将其传递给 celery 进行处理;但与此同时,这个对象可能会改变;作为解决方案,您应将对象 id 作为参数传递,以便 celery 任务可以新鲜获取它:
build.delay(instance.pk)
...
@task
def build(my_key):
instance = SomeModel.objects.get(pk=my_key)
instance.status = 'Processing'
instance.save()
Run Code Online (Sandbox Code Playgroud)
第二个问题本质上很微妙,很少在雷达上显示出来。代码的第一部分可以在事务中调用,这意味着可能存在一种情况,您的任务(在 celery 中)将比事务提交更快,然后您的模型将首先保存在 celery 任务中,然后通过事务保存- 这里有一个问题。
如果您按照上面的建议更改代码,第二个问题描述的情况可能不会发生,或者会显示不同的错误。
为了避免此类问题,最好从transaction.oncommit处理程序中调用 celery 任务(在 Django 1.9 版本中引入)
还有一条评论,我可以看到您正在更改对象的状态:
instance.status = 'Processing'
Run Code Online (Sandbox Code Playgroud)
最有可能作为信息,但可能用作锁定机制...有一个非常好的选项select_for_update方法QuerySet,它允许您在事务期间锁定对象。当您执行以下操作时,这对于 celery 任务特别有用:
instance = SomeModel.objects.select_for_update().get(pk=my_key)
Run Code Online (Sandbox Code Playgroud)
它会停止你的任务等待其他任务完成(不要忘记放下@transaction.atomic这个任务)
如果您将传递nowait=True给select_for_update- 它将立即生成异常,以便您处理情况。
| 归档时间: |
|
| 查看次数: |
1043 次 |
| 最近记录: |