使用add_periodic_task动态设置Celery(celerybeat)中的周期性任务

Mar*_*arc 9 python django scheduled-tasks celery celerybeat

我正在使用Celery 4.0.1,Django 1.10我有麻烦调度任务(运行任务工作正常).这是芹菜配置:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

app.conf.BROKER_URL = 'amqp://{}:{}@{}'.format(settings.AMQP_USER, settings.AMQP_PASSWORD, settings.AMQP_HOST)
app.conf.CELERY_DEFAULT_EXCHANGE = 'myapp.celery'
app.conf.CELERY_DEFAULT_QUEUE = 'myapp.celery_default'
app.conf.CELERY_TASK_SERIALIZER = 'json'
app.conf.CELERY_ACCEPT_CONTENT = ['json']
app.conf.CELERY_IGNORE_RESULT = True
app.conf.CELERY_DISABLE_RATE_LIMITS = True
app.conf.BROKER_POOL_LIMIT = 2

app.conf.CELERY_QUEUES = (
    Queue('myapp.celery_default'),
    Queue('myapp.queue1'),
    Queue('myapp.queue2'),
    Queue('myapp.queue3'),
)
Run Code Online (Sandbox Code Playgroud)

然后在tasks.py中我有:

@app.task(queue='myapp.queue1')
def my_task(some_id):
    print("Doing something with", some_id)
Run Code Online (Sandbox Code Playgroud)

在views.py中,我想安排此任务:

def my_view(request, id):
    app.add_periodic_task(10, my_task.s(id))
Run Code Online (Sandbox Code Playgroud)

然后我执行命令:

sudo systemctl start rabbitmq.service
celery -A myapp.celery_app beat -l debug
celery worker -A myapp.celery_app
Run Code Online (Sandbox Code Playgroud)

但是这项任务从未安排过.我在日志中看不到任何内容.这项任务正在起作用,因为如果在我看来我做了:

def my_view(request, id):
    my_task.delay(id)
Run Code Online (Sandbox Code Playgroud)

任务已执行.

如果在我的配置文件中,如果我手动安排任务,那么它可以工作:

app.conf.CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'tasks.my_task',
        'schedule': 10.0,
        'args': (66,)
    },
}
Run Code Online (Sandbox Code Playgroud)

我无法动态安排任务.任何的想法?

Dhi*_*aTN 20

实际上,您无法在视图级别定义周期性任务,因为节拍计划设置将首先加载,并且不能在运行时重新安排:

add_periodic_task()函数将在幕后添加条目到beat_schedule设置,同样的设置也可用于手动设置周期性任务:

app.conf.CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'tasks.my_task',
        'schedule': 10.0,
        'args': (66,)
    },
}
Run Code Online (Sandbox Code Playgroud)

这意味着如果你想使用add_periodic_task()它应该包装在on_after_configure芹菜应用程序级别的处理程序中,并且对运行时的任何修改都不会生效:

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(10, my_task.s(66))
Run Code Online (Sandbox Code Playgroud)

文档中所述,常规celerybeat只是跟踪任务执行:

默认调度程序是celery.beat.PersistentScheduler,它只是跟踪本地搁置数据库文件中的最后运行时间.

为了能够动态管理周期性任务并在运行时重新安排celerybeat:

还有django-celery-beat扩展,它将计划存储在Django数据库中,并提供了一个方便的管理界面来管理运行时的周期性任务.

任务将保留在django数据库中,并且可以在db级别的任务模型中更新调度程序.每当您更新定期任务时,此任务表中的计数器将递增,并告知芹菜节拍服务从数据库重新加载计划.

您可能的解决方案如下:

from django_celery_beat.models import PeriodicTask, IntervalSchedule

schedule= IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS)
task = PeriodicTask.objects.create(interval=schedule, name='any name', task='tasks.my_task', args=json.dumps([66]))
Run Code Online (Sandbox Code Playgroud)

views.py

def update_task_view(request, id)
    task = PeriodicTask.objects.get(name="task name") # if we suppose names are unique
    task.args=json.dumps([id])
    task.save()
Run Code Online (Sandbox Code Playgroud)


编辑:(13/01/2018)


最新版本4.1.0已经在这张票#3958中解决了这个主题,并已合并

  • 我相信最新版本(4.1.0之后)应该解决这个问题.这是正在进行的开发[#3958](https://github.com/celery/celery/pull/3958) (3认同)