使用python而不使用django在celery中执行周期性任务

Man*_*dav 5 python-2.7 celery-task celerybeat

大家好,我是 celery 和 python 的新手。我正在使用rabbitmq-server 创建一个简单的任务。但我不知道如何在 python 中使用 celerybeat 实现周期性任务。我搜索但每个地方我都会用 django 定期执行任务。

我使用此代码作为tasks.py: from celery import Celery from time import strftime

app = Celery('tasks',broker='pyamqp://guest@localhost//')

@app.task
def show_time():
    return strftime('%Y-%m-%d %H:%M:%S')
Run Code Online (Sandbox Code Playgroud)

运行任务.py:

from tasks import show_time
show_time.delay()
Run Code Online (Sandbox Code Playgroud)

谢谢你的时间。

Man*_*dav 4

最后经过一些更简洁的搜索我找到了解决方案

from celery import Celery
from kombu import Queue, Exchange



class Config(object):
    CELERY_QUEUES = (
        Queue(
            'try',
            exchange=Exchange('try'),
            routing_key='try',
        ),
    )
celery =Celery('tasks',broker='pyamqp://guest@localhost//')

celery.config_from_object(Config)


celery.conf.beat_schedule = {
    'planner': {
        'task': 'task_planner.some_task',
        'schedule': 5.0,
    },
}


@celery.task(queue='try')
def some_task():
    print('Hooray')
Run Code Online (Sandbox Code Playgroud)

并运行命令: celery -A task_planner worker -l info -B