Chi*_*chi 18 python django circular-dependency celery django-celery
我有一个Django应用程序,使用Celery来卸载一些任务.主要是,它推迟了数据库表中某些字段的计算.
所以,我有一个tasks.py:
from models import MyModel
from celery import shared_task
@shared_task
def my_task(id):
qs = MyModel.objects.filter(some_field=id)
for record in qs:
my_value = #do some computations
record.my_field = my_value
record.save()
Run Code Online (Sandbox Code Playgroud)
在models.py中
from django.db import models
from tasks import my_task
class MyModel(models.Model):
field1 = models.IntegerField()
#more fields
my_field = models.FloatField(null=True)
@staticmethod
def load_from_file(file):
#parse file, set fields from file
my_task.delay(id)
Run Code Online (Sandbox Code Playgroud)
显然,由于循环导入(models
导入tasks
和tasks
导入models
),这不起作用.
我暂时通过调用my_task.delay()
来解决这个问题views.py
,但是将模型逻辑保留在模型类中似乎是有意义的.有没有更好的方法呢?
Pio*_*iek 14
约书亚发布的解决方案非常好,但是当我第一次尝试时,我发现我的@receiver
装饰者没有效果.那是因为tasks
模块没有在任何地方导入,这是因为我使用了任务自动发现.
然而,有,另一种方式来分离tasks.py
从modules.py
.也就是说,任务可以按名称发送,并且不必在发送它们的过程中评估(导入)它们:
from django.db import models
#from tasks import my_task
import celery
class MyModel(models.Model):
field1 = models.IntegerField()
#more fields
my_field = models.FloatField(null=True)
@staticmethod
def load_from_file(file):
#parse file, set fields from file
#my_task.delay(id)
celery.current_app.send_task('myapp.tasks.my_task', (id,))
Run Code Online (Sandbox Code Playgroud)
send_task()
是Celery应用程序对象的一种方法.
在此解决方案中,为您的任务处理正确,可预测的名称非常重要.
Chi*_*and 12
在模型中my_task
,您可以在使用它之前导入它,而不是在文件的开头导入它.它将解决循环进口问题.
from django.db import models
class MyModel(models.Model):
field1 = models.IntegerField()
#more fields
my_field = models.FloatField(null=True)
@staticmethod
def load_from_file(file):
#parse file, set fields from file
from tasks import my_task # import here instead of top
my_task.delay(id)
Run Code Online (Sandbox Code Playgroud)
或者,你也可以做同样的事情tasks.py
.您可以在使用模型之前导入模型而不是开始模型.
替代方案:
您可以使用send_task
方法来调用您的任务
from celery import current_app
from django.db import models
class MyModel(models.Model):
field1 = models.IntegerField()
#more fields
my_field = models.FloatField(null=True)
@staticmethod
def load_from_file(file):
#parse file, set fields from file
current_app.send_task('myapp.tasks.my_task', (id,))
Run Code Online (Sandbox Code Playgroud)
只是为了在这个列表中再添一个不太好的解决方案,我最终做的就是依靠django现在内置的app注册表.
因此tasks.py
,您可以使用模型apps.get_model()
来访问模型,而不是从模型中导入.
我用一个带有健康文档的帮助方法来做这个,只是为了表达为什么这很痛苦:
from django.apps import apps
def _model(model_name):
"""Generically retrieve a model object.
This is a hack around Django/Celery's inherent circular import
issues with tasks.py/models.py. In order to keep clean abstractions, we use
this to avoid importing from models, introducing a circular import.
No solutions for this are good so far (unnecessary signals, inline imports,
serializing the whole object, tasks forced to be in model, this), so we
use this because at least the annoyance is constrained to tasks.
"""
return apps.get_model('my_app', model_name)
Run Code Online (Sandbox Code Playgroud)
然后:
@shared_task
def some_task(post_id):
post = _model('Post').objects.get(pk=post_id)
Run Code Online (Sandbox Code Playgroud)
你当然可以apps.get_model()
直接使用.
使用信号.
tasks.py
from models import MyModel, my_signal
from celery import shared_task
from django.dispatch import receiver
@shared_task
def my_task(id):
qs = MyModel.objects.filter(some_field=id)
for record in qs:
my_value = #do some computations
record.my_field = my_value
record.save()
@receiver(my_signal)
def my_receiver(sender, **kwargs):
my_task.delay(kwargs['id'])
Run Code Online (Sandbox Code Playgroud)
models.py
from django.db import models
from tasks import my_task
from django.dispatch import Signal
my_signal = Signal(providing_args=['id'])
class MyModel(models.Model):
field1 = models.IntegerField()
#more fields
my_field = models.FloatField(null=True)
@staticmethod
def load_from_file(file):
#parse file, set fields from file
my_signal.send(sender=?, id=?)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
4017 次 |
最近记录: |