Mar*_*arc 9 python django scheduled-tasks celery celerybeat
我正在使用Celery 4.0.1,Django 1.10我有麻烦调度任务(运行任务工作正常).这是芹菜配置:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.conf.BROKER_URL = 'amqp://{}:{}@{}'.format(settings.AMQP_USER, settings.AMQP_PASSWORD, settings.AMQP_HOST)
app.conf.CELERY_DEFAULT_EXCHANGE = 'myapp.celery'
app.conf.CELERY_DEFAULT_QUEUE = 'myapp.celery_default'
app.conf.CELERY_TASK_SERIALIZER = 'json'
app.conf.CELERY_ACCEPT_CONTENT = ['json']
app.conf.CELERY_IGNORE_RESULT = True
app.conf.CELERY_DISABLE_RATE_LIMITS = True
app.conf.BROKER_POOL_LIMIT = 2
app.conf.CELERY_QUEUES = (
Queue('myapp.celery_default'),
Queue('myapp.queue1'),
Queue('myapp.queue2'),
Queue('myapp.queue3'),
)
Run Code Online (Sandbox Code Playgroud)
然后在tasks.py中我有:
@app.task(queue='myapp.queue1')
def my_task(some_id):
print("Doing something with", some_id)
Run Code Online (Sandbox Code Playgroud)
在views.py中,我想安排此任务:
def my_view(request, id):
app.add_periodic_task(10, my_task.s(id))
Run Code Online (Sandbox Code Playgroud)
然后我执行命令:
sudo systemctl start rabbitmq.service
celery -A myapp.celery_app beat -l debug
celery worker -A myapp.celery_app
Run Code Online (Sandbox Code Playgroud)
但是这项任务从未安排过.我在日志中看不到任何内容.这项任务正在起作用,因为如果在我看来我做了:
def my_view(request, id):
my_task.delay(id)
Run Code Online (Sandbox Code Playgroud)
任务已执行.
如果在我的配置文件中,如果我手动安排任务,那么它可以工作:
app.conf.CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'tasks.my_task',
'schedule': 10.0,
'args': (66,)
},
}
Run Code Online (Sandbox Code Playgroud)
我无法动态安排任务.任何的想法?
Dhi*_*aTN 20
实际上,您无法在视图级别定义周期性任务,因为节拍计划设置将首先加载,并且不能在运行时重新安排:
该
add_periodic_task()函数将在幕后添加条目到beat_schedule设置,同样的设置也可用于手动设置周期性任务:Run Code Online (Sandbox Code Playgroud)app.conf.CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'tasks.my_task', 'schedule': 10.0, 'args': (66,) }, }
这意味着如果你想使用add_periodic_task()它应该包装在on_after_configure芹菜应用程序级别的处理程序中,并且对运行时的任何修改都不会生效:
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(10, my_task.s(66))
Run Code Online (Sandbox Code Playgroud)
如文档中所述,常规celerybeat只是跟踪任务执行:
默认调度程序是
celery.beat.PersistentScheduler,它只是跟踪本地搁置数据库文件中的最后运行时间.
为了能够动态管理周期性任务并在运行时重新安排celerybeat:
还有django-celery-beat扩展,它将计划存储在Django数据库中,并提供了一个方便的管理界面来管理运行时的周期性任务.
任务将保留在django数据库中,并且可以在db级别的任务模型中更新调度程序.每当您更新定期任务时,此任务表中的计数器将递增,并告知芹菜节拍服务从数据库重新加载计划.
您可能的解决方案如下:
from django_celery_beat.models import PeriodicTask, IntervalSchedule
schedule= IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS)
task = PeriodicTask.objects.create(interval=schedule, name='any name', task='tasks.my_task', args=json.dumps([66]))
Run Code Online (Sandbox Code Playgroud)
views.py
def update_task_view(request, id)
task = PeriodicTask.objects.get(name="task name") # if we suppose names are unique
task.args=json.dumps([id])
task.save()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
12456 次 |
| 最近记录: |