标签: celerybeat

如何用celery和Django以编程方式生成celerybeat条目

我希望能够以编程方式生成celerybeat条目并在添加条目时重新同步celerybeat.这里的文档陈述

默认情况下,条目取自CELERYBEAT_SCHEDULE设置,但也可以使用自定义存储,例如将条目存储在SQL数据库中.

所以我想弄清楚我需要扩展哪些类才能做到这一点.

我一直在研究芹菜调度程序文档djcelery api文档,但是关于这些方法中的一些方法所做的文档是不存在的,所以即将潜入某些来源并且只是希望有人可以指出我正确的方向.

我想我正在做的高水平会有所帮助......作为一个用户,我需要能够从一组预定义的任务中进行选择,并为用户提供一种方法来为其选择某种自定义时间表.执行,像每天/每周/每月,什么日期和时间.

这也是Django与djcelery.

UPDATE

我看到djcelery管理员的代码,但不清楚数据是如何持久化的.我目前有一个通用addTask视图,如下所示:

def addTask(request):

intervalSchedule = IntervalSchedule.from_schedule(schedule(timedelta(seconds=10)))
intervalSchedule.save()
modelData = dict(
    name="dcTestPersist",
    task="technologytrackerapi.views.createRecord",
    schedule=intervalSchedule,
)
periodicTask = PeriodicTask(**modelData)
periodicTask.save()
return render_to_response('taskView.html')
Run Code Online (Sandbox Code Playgroud)

数据在db中看起来是正确的但是当守护进程运行时它有以下错误:

[2012-03-06 00:23:07,926:警告/节拍]流程节拍:
[2012-03-06 00:23:07,926:警告/节拍]追溯(最近一次通话):
[2012-03-06 00 :23:07,926:警告/节拍]文件"/usr/lib/python2.7/multiprocessing/process.py",第258行,在_bootstrap中
[2012-03-06 00:23:07,926:警告/节拍]自我.运行()
[2012-03-06 00:23:07927:警告/节拍]文件"/home/dchesterman/Documents/PythonDev/.virtualenvs/ros/local/lib/python2.7/site-packages/celery/beat .py",第464行,在运行中
[2012-03-06 00:23:07,927:警告/节拍] self.service.start(embedded_process = True)
[2012-03-06 00:23:07,927:警告/节拍]文件"/ home/dchesterman/Documents/PythonDev /.virtualenvs/ros/local/lib/python2.7/site-packages/celery/beat.py",第403行,开始
[2012-03-06 00:23 :07927:警告/节拍]间隔= self.scheduler.tick()
[2012-03-06 00:23:07927:警告/节拍]文件"/home/dchesterman/Documents/PythonDev/.virtualenvs/ros/local/ lib/python2.7/site-packages/celery/beat.py",第194行,勾选
[2012-03-06 00:23:07,927:警告/是 在] next_time_to_run = self.maybe_due(条目,self.publisher)
[2012-03-06 00:23:07927:警告/节拍]文件"/home/dchesterman/Documents/PythonDev/.virtualenvs/ros/local/lib/ python2.7 /站点包/芹菜/ beat.py",线路170,在maybe_due
[2012-03-06 00:23:07927:警告/节拍] is_due,next_time_to_run = …

django celery django-celery celerybeat

11
推荐指数
1
解决办法
4521
查看次数

Django/Celery在localhost上有多个队列 - 路由无法正常工作

我跟着celery docs在我的开发机器上定义了2个队列.

我的芹菜设置:

CELERY_ALWAYS_EAGER = True
CELERY_TASK_RESULT_EXPIRES = 60  # 1 mins
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_CREATE_MISSING_QUEUES = True
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='arena.social.tasks.#'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'fs_feeds',
    },
}
Run Code Online (Sandbox Code Playgroud)

我在我的项目的virtualenv中打开了两个终端窗口,并运行以下命令:

terminal_1$ celery -A arena worker -Q default -B -l debug --purge -n deafult_worker
terminal_2$ celery -A arena worker -Q feeds -B -l debug --purge -n feeds_worker
Run Code Online (Sandbox Code Playgroud)

我得到的是所有任务都由两个队列处理.

我的目标是让一个队列只处理在其中定义的一个任务CELERY_ROUTES和默认队列来处理所有其他任务.

我也按照这个SO问题,rabbitmqctl list_queues …

python django celery celerybeat

11
推荐指数
1
解决办法
8206
查看次数

多个celerybeat实例用于弹性豆秆上的自动标定django app

我试图找出构建Django应用程序的最佳方法,该应用程序使用Celery在自动缩放AWS ElasticBeanstalk环境中处理异步和计划任务.

到目前为止,我只使用了Celery + Celerybeat的单个实例Elastic Beanstalk环境,这非常好用.但是,我希望在我的环境中运行多个实例,因为每个实例都会崩溃,并且在实例重新启动之前需要花费很多时间,但是我无法将当前的体系结构扩展到多个实例,因为Celerybeat应该在所有实例中只运行一次,否则Celerybeat安排的每个任务将被多次提交(对于环境中的每个EC2实例一次).

我已经阅读了多个解决方案,但是所有这些解决方案似乎都存在一些问题,这些问题并不适用于我:

  • 使用django cache + locking:这种方法更像是一个快速解决方案,而不是一个真正的解决方案.如果您有许多计划任务并且需要添加代码来检查每个任务的缓存,则这不是解决方案.此外,任务仍然会多次提交,这种方法只能确保重复项的执行停止.
  • 使用leader_only选项与ebextensions:做工精细最初,但如果在环境崩溃或EC2实例被替换,这将导致在没有Celerybeat是运行在所有的情况下,因为领导者是在创设环境只被定义一次.
  • 为Elastic Beanstalk工作层中的异步任务创建一个新的Django应用程序:很好,因为Web服务器和工作人员可以独立扩展,并且Web服务器性能不受工作人员执行的巨大异步工作负载的影响.但是,此方法不适用于Celery,因为工作层SQS守护程序会删除邮件并将邮件正文发布到预定义的URL.此外,我不喜欢有一个完整的额外Django应用程序的想法,需要从主应用程序导入模型,如果在主应用程序中修改任务需要单独更新和部署.

如何在分布式Elastic Beanstalk环境中将Celery与计划任务一起使用而不进行任务复制?例如,如何确保在Elastic Beanstalk环境中始终在所有实例中运行一个实例(即使Celerybeat的当前实例崩溃)?

有没有其他方法可以实现这一目标?在Django中使用Elastic Beanstalk的Worker Tier环境的最佳方法是什么?

python django celery celerybeat amazon-elastic-beanstalk

11
推荐指数
1
解决办法
675
查看次数

在构建 docker image level=error msg="无法关闭 tar writer: io: read/write on closed pipe"

time="2017-10-27T07:39:20Z" level=error msg="Can't add file /var/app/current/app/content_classifier/forest.pickle to tar: io: read/write on closed pipe" 
time="2017-10-27T07:39:20Z" level=error msg="Can't close tar writer: io: read/write on closed pipe"
Failed to build Docker image aws_beanstalk/staging-app:  tar writer: io: read/write on closed pipe" 
Error response from daemon: Untar error on re-exec cmd: fork/exec /proc/self/exe: cannot allocate memory. Check snapshot logs for details. (Executor::NonZeroExitStatus)
Run Code Online (Sandbox Code Playgroud)

我在 django_app/content_classifer 对象中有一个 Pickle 对象。有一个类可以预测结果,并且在 celery 任务文件初始化时它正在初始化。

docker build 在给出标题中的错误消息后一直失败。

linux celery celerybeat docker amazon-elastic-beanstalk

11
推荐指数
4
解决办法
2万
查看次数

芹菜在同一时间间隔组下击败了多个任务

我正在尝试设置两个每分钟都运行的任务.有没有办法将它们组合在一起运行?

CELERYBEAT_SCHEDULEceleryconfig.py下面指定:

CELERYBEAT_SCHEDULE = {
    'every-minute': {
        'task': 'tasks.add',
        'schedule': crontab(minute='*/1'),
        'args': (1,2)
    },
}
Run Code Online (Sandbox Code Playgroud)

所以如果我想运行两个任务,我会期待这样的事情?

CELERYBEAT_SCHEDULE = {
    'every-minute': {
        'task': ['tasks.add','task.multiply'],
        'schedule': [crontab(minute='*/1'),crontab(minute='*/1')],
        'args': [(1,2),(3,4)]
    },
}
Run Code Online (Sandbox Code Playgroud)

但它没有用.这样做有什么标准方法吗?

python celery flask celerybeat

10
推荐指数
1
解决办法
6147
查看次数

芹菜节拍时间表:芹菜开始时立即运行任务?

如果我创建一个芹菜节拍时间表,使用timedelta(days=1),第一个任务将在24小时后进行,引用芹菜节拍文件:

使用timedelta进行计划意味着任务将以30秒的间隔发送(第一个任务将在芹菜节拍开始后30秒发送,然后在最后一次运行后每30秒发送一次).

但事实是,在很多情况下,调度程序在启动时运行任务实际上很重要,但是我没有找到允许我在芹菜启动后立即运行任务的选项,我不仔细阅读,或者芹菜错过了这个功能吗?

python celery celerybeat

10
推荐指数
1
解决办法
2655
查看次数

如何在 Django 中为 celery beat 设置不同的工作日/周末时间表?

我如何在 celery beat 中以不同的方式在工作日和周末安排我的任务?

时间表在我的settings.py文件中设置如下

{
    "task_weekday": {
        "task": "tasks.my_regular_task",
        "schedule": crontab(minute="0-30", hour="4,5", day_of_week="mon-fri"),
        "options": {"queue": "queue_name"},
    },
    "task_weekend": {
        "task": "tasks.my_regular_task",
        "schedule": crontab(minute="0-5", hour="10,12", day_of_week="sat,sun"),
        "options": {"queue": "queue_name"},
    },
}
Run Code Online (Sandbox Code Playgroud)

但是,当我设置它时,它在今天(2021 年 3 月 21 日星期日)运行工作日计划,而不是选择周末计划。

我将应用程序时区设置为'US/Pacific'并且CELERY_ENABLE_UTC设置为False

设置后,我看到以下日志条目,但它运行工作日任务计划。

[2021-03-21 17:57:50,082: DEBUG/MainProcess] Current schedule:

<ScheduleEntry: task_weekday tasks.my_regular_task() <crontab: 0-30 4,5 mon-fri * * (m/h/d/dM/MY)>

<ScheduleEntry: task_weekend tasks.my_regular_task() <crontab: 0-5 10,12 sat,sun * * (m/h/d/dM/MY)>

<ScheduleEntry: celery.backend_cleanup celery.backend_cleanup() <crontab: 0 4 * * …
Run Code Online (Sandbox Code Playgroud)

django cron celery django-celery celerybeat

10
推荐指数
1
解决办法
134
查看次数

Celery和Redis的内存不足

我有一个部署到Heroku的Django应用程序,其中一个工作进程运行芹菜(+ celerycam用于监控).我使用RedisToGo的Redis数据库作为经纪人.我注意到Redis的内存不足.

这是我的procfile的样子:

web: python app/manage.py run_gunicorn -b "0.0.0.0:$PORT" -w 3
worker: python lipo/manage.py celerycam & python app/manage.py celeryd -E -B --loglevel=INFO
Run Code Online (Sandbox Code Playgroud)

这是KEYS'*'的输出:

  1. "_kombu.binding.celeryd.pidbox"
  2. "celeryev.643a99be-74e8-44e1-8c67-fdd9891a5326"
  3. "celeryev.f7a1d511-448b-42ad-9e51-52baee60e977"
  4. "_kombu.binding.celeryev"
  5. "celeryev.d4bd2c8d-57ea-4058-8597-e48f874698ca"
  6. `_kombu.binding.celery"

celeryev.643a99be-74e8-44e1-8c67-fdd9891a5326 正在填写这些消息:

{"sw_sys": "Linux", "clock": 1, "timestamp": 1325914922.206671, "hostname": "064d9ffe-94a3-4a4e-b0c2-be9a85880c74", "type": "worker-online", "sw_ident": "celeryd", "sw_ver": "2.4.5"}
Run Code Online (Sandbox Code Playgroud)

知道如何定期清除这些消息吗?

django redis celery celeryd celerybeat

9
推荐指数
1
解决办法
2093
查看次数

芹菜autoreload无法正常工作

我正在使用Celery 3.1.16经纪人(运行RabbitMQ)和多名芹菜工人与celeryd通过主管守护.问题在于任务更新.当我更新我的tasks.py文件时,芹菜工作者运行旧代码.

芹菜发射命令:

/home/my_project/bin/celery -B --autoreload --app=my_app.celery:app worker --loglevel=INFO
Run Code Online (Sandbox Code Playgroud)

我在django settings.py中包含任务文件:

CELERY_IMPORTS = [
    'my_app.tasks'
]
Run Code Online (Sandbox Code Playgroud)

pyinotify安装并工作(我猜是这样),芹菜日志的一部分:

[2014-12-16 20:56:00,016: INFO/MainProcess] Task my_app.tasks.periodic_update_task_statistic[175c2557-7c07-43c3-ac70-f4e115344134] succeeded in 0.00816309102811s: 'ok!'
[2014-12-16 20:56:11,157: INFO/MainProcess] Detected modified modules: ['my_app.tasks']
[2014-12-16 20:57:00,001: INFO/Beat] Scheduler: Sending due task my_app.tasks.periodic_update_task_statistic (my_app.tasks.periodic_update_task_statistic)
[2014-12-16 20:57:00,007: INFO/MainProcess] Received task: my_app.tasks.periodic_update_task_statistic[f22998a9-dcb4-4c29-8086-86dd6e57eae1]
Run Code Online (Sandbox Code Playgroud)

所以,我的问题是:如果修改了celery更新并应用新任务代码?

python django celery pyinotify celerybeat

9
推荐指数
1
解决办法
2842
查看次数

使用add_periodic_task动态设置Celery(celerybeat)中的周期性任务

我正在使用Celery 4.0.1,Django 1.10我有麻烦调度任务(运行任务工作正常).这是芹菜配置:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

app.conf.BROKER_URL = 'amqp://{}:{}@{}'.format(settings.AMQP_USER, settings.AMQP_PASSWORD, settings.AMQP_HOST)
app.conf.CELERY_DEFAULT_EXCHANGE = 'myapp.celery'
app.conf.CELERY_DEFAULT_QUEUE = 'myapp.celery_default'
app.conf.CELERY_TASK_SERIALIZER = 'json'
app.conf.CELERY_ACCEPT_CONTENT = ['json']
app.conf.CELERY_IGNORE_RESULT = True
app.conf.CELERY_DISABLE_RATE_LIMITS = True
app.conf.BROKER_POOL_LIMIT = 2

app.conf.CELERY_QUEUES = (
    Queue('myapp.celery_default'),
    Queue('myapp.queue1'),
    Queue('myapp.queue2'),
    Queue('myapp.queue3'),
)
Run Code Online (Sandbox Code Playgroud)

然后在tasks.py中我有:

@app.task(queue='myapp.queue1')
def my_task(some_id):
    print("Doing something with", some_id)
Run Code Online (Sandbox Code Playgroud)

在views.py中,我想安排此任务:

def my_view(request, id):
    app.add_periodic_task(10, my_task.s(id))
Run Code Online (Sandbox Code Playgroud)

然后我执行命令:

sudo systemctl start rabbitmq.service
celery -A myapp.celery_app beat -l debug
celery worker -A myapp.celery_app
Run Code Online (Sandbox Code Playgroud)

但是这项任务从未安排过.我在日志中看不到任何内容.这项任务正在起作用,因为如果在我看来我做了:

def my_view(request, …
Run Code Online (Sandbox Code Playgroud)

python django scheduled-tasks celery celerybeat

9
推荐指数
1
解决办法
1万
查看次数