我希望能够以编程方式生成celerybeat条目并在添加条目时重新同步celerybeat.这里的文档陈述
默认情况下,条目取自CELERYBEAT_SCHEDULE设置,但也可以使用自定义存储,例如将条目存储在SQL数据库中.
所以我想弄清楚我需要扩展哪些类才能做到这一点.
我一直在研究芹菜调度程序文档和djcelery api文档,但是关于这些方法中的一些方法所做的文档是不存在的,所以即将潜入某些来源并且只是希望有人可以指出我正确的方向.
我想我正在做的高水平会有所帮助......作为一个用户,我需要能够从一组预定义的任务中进行选择,并为用户提供一种方法来为其选择某种自定义时间表.执行,像每天/每周/每月,什么日期和时间.
这也是Django与djcelery.
UPDATE
我看到djcelery管理员的代码,但不清楚数据是如何持久化的.我目前有一个通用addTask
视图,如下所示:
def addTask(request):
intervalSchedule = IntervalSchedule.from_schedule(schedule(timedelta(seconds=10)))
intervalSchedule.save()
modelData = dict(
name="dcTestPersist",
task="technologytrackerapi.views.createRecord",
schedule=intervalSchedule,
)
periodicTask = PeriodicTask(**modelData)
periodicTask.save()
return render_to_response('taskView.html')
Run Code Online (Sandbox Code Playgroud)
数据在db中看起来是正确的但是当守护进程运行时它有以下错误:
[2012-03-06 00:23:07,926:警告/节拍]流程节拍:
[2012-03-06 00:23:07,926:警告/节拍]追溯(最近一次通话):
[2012-03-06 00 :23:07,926:警告/节拍]文件"/usr/lib/python2.7/multiprocessing/process.py",第258行,在_bootstrap中
[2012-03-06 00:23:07,926:警告/节拍]自我.运行()
[2012-03-06 00:23:07927:警告/节拍]文件"/home/dchesterman/Documents/PythonDev/.virtualenvs/ros/local/lib/python2.7/site-packages/celery/beat .py",第464行,在运行中
[2012-03-06 00:23:07,927:警告/节拍] self.service.start(embedded_process = True)
[2012-03-06 00:23:07,927:警告/节拍]文件"/ home/dchesterman/Documents/PythonDev /.virtualenvs/ros/local/lib/python2.7/site-packages/celery/beat.py",第403行,开始
[2012-03-06 00:23 :07927:警告/节拍]间隔= self.scheduler.tick()
[2012-03-06 00:23:07927:警告/节拍]文件"/home/dchesterman/Documents/PythonDev/.virtualenvs/ros/local/ lib/python2.7/site-packages/celery/beat.py",第194行,勾选
[2012-03-06 00:23:07,927:警告/是 在] next_time_to_run = self.maybe_due(条目,self.publisher)
[2012-03-06 00:23:07927:警告/节拍]文件"/home/dchesterman/Documents/PythonDev/.virtualenvs/ros/local/lib/ python2.7 /站点包/芹菜/ beat.py",线路170,在maybe_due
[2012-03-06 00:23:07927:警告/节拍] is_due,next_time_to_run = …
我跟着celery docs在我的开发机器上定义了2个队列.
我的芹菜设置:
CELERY_ALWAYS_EAGER = True
CELERY_TASK_RESULT_EXPIRES = 60 # 1 mins
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_CREATE_MISSING_QUEUES = True
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('feeds', Exchange('feeds'), routing_key='arena.social.tasks.#'),
)
CELERY_ROUTES = {
'arena.social.tasks.Update': {
'queue': 'fs_feeds',
},
}
Run Code Online (Sandbox Code Playgroud)
我在我的项目的virtualenv中打开了两个终端窗口,并运行以下命令:
terminal_1$ celery -A arena worker -Q default -B -l debug --purge -n deafult_worker
terminal_2$ celery -A arena worker -Q feeds -B -l debug --purge -n feeds_worker
Run Code Online (Sandbox Code Playgroud)
我得到的是所有任务都由两个队列处理.
我的目标是让一个队列只处理在其中定义的一个任务CELERY_ROUTES
和默认队列来处理所有其他任务.
我也按照这个SO问题,rabbitmqctl list_queues …
我试图找出构建Django应用程序的最佳方法,该应用程序使用Celery在自动缩放AWS ElasticBeanstalk环境中处理异步和计划任务.
到目前为止,我只使用了Celery + Celerybeat的单个实例Elastic Beanstalk环境,这非常好用.但是,我希望在我的环境中运行多个实例,因为每个实例都会崩溃,并且在实例重新启动之前需要花费很多时间,但是我无法将当前的体系结构扩展到多个实例,因为Celerybeat应该在所有实例中只运行一次,否则Celerybeat安排的每个任务将被多次提交(对于环境中的每个EC2实例一次).
我已经阅读了多个解决方案,但是所有这些解决方案似乎都存在一些问题,这些问题并不适用于我:
如何在分布式Elastic Beanstalk环境中将Celery与计划任务一起使用而不进行任务复制?例如,如何确保在Elastic Beanstalk环境中始终在所有实例中运行一个实例(即使Celerybeat的当前实例崩溃)?
有没有其他方法可以实现这一目标?在Django中使用Elastic Beanstalk的Worker Tier环境的最佳方法是什么?
time="2017-10-27T07:39:20Z" level=error msg="Can't add file /var/app/current/app/content_classifier/forest.pickle to tar: io: read/write on closed pipe"
time="2017-10-27T07:39:20Z" level=error msg="Can't close tar writer: io: read/write on closed pipe"
Failed to build Docker image aws_beanstalk/staging-app: tar writer: io: read/write on closed pipe"
Error response from daemon: Untar error on re-exec cmd: fork/exec /proc/self/exe: cannot allocate memory. Check snapshot logs for details. (Executor::NonZeroExitStatus)
Run Code Online (Sandbox Code Playgroud)
我在 django_app/content_classifer 对象中有一个 Pickle 对象。有一个类可以预测结果,并且在 celery 任务文件初始化时它正在初始化。
docker build
在给出标题中的错误消息后一直失败。
我正在尝试设置两个每分钟都运行的任务.有没有办法将它们组合在一起运行?
我CELERYBEAT_SCHEDULE
在celeryconfig.py
下面指定:
CELERYBEAT_SCHEDULE = {
'every-minute': {
'task': 'tasks.add',
'schedule': crontab(minute='*/1'),
'args': (1,2)
},
}
Run Code Online (Sandbox Code Playgroud)
所以如果我想运行两个任务,我会期待这样的事情?
CELERYBEAT_SCHEDULE = {
'every-minute': {
'task': ['tasks.add','task.multiply'],
'schedule': [crontab(minute='*/1'),crontab(minute='*/1')],
'args': [(1,2),(3,4)]
},
}
Run Code Online (Sandbox Code Playgroud)
但它没有用.这样做有什么标准方法吗?
如果我创建一个芹菜节拍时间表,使用timedelta(days=1)
,第一个任务将在24小时后进行,引用芹菜节拍文件:
使用timedelta进行计划意味着任务将以30秒的间隔发送(第一个任务将在芹菜节拍开始后30秒发送,然后在最后一次运行后每30秒发送一次).
但事实是,在很多情况下,调度程序在启动时运行任务实际上很重要,但是我没有找到允许我在芹菜启动后立即运行任务的选项,我不仔细阅读,或者芹菜错过了这个功能吗?
我如何在 celery beat 中以不同的方式在工作日和周末安排我的任务?
时间表在我的settings.py
文件中设置如下
{
"task_weekday": {
"task": "tasks.my_regular_task",
"schedule": crontab(minute="0-30", hour="4,5", day_of_week="mon-fri"),
"options": {"queue": "queue_name"},
},
"task_weekend": {
"task": "tasks.my_regular_task",
"schedule": crontab(minute="0-5", hour="10,12", day_of_week="sat,sun"),
"options": {"queue": "queue_name"},
},
}
Run Code Online (Sandbox Code Playgroud)
但是,当我设置它时,它在今天(2021 年 3 月 21 日星期日)运行工作日计划,而不是选择周末计划。
我将应用程序时区设置为'US/Pacific'
并且CELERY_ENABLE_UTC
设置为False
。
设置后,我看到以下日志条目,但它运行工作日任务计划。
[2021-03-21 17:57:50,082: DEBUG/MainProcess] Current schedule:
<ScheduleEntry: task_weekday tasks.my_regular_task() <crontab: 0-30 4,5 mon-fri * * (m/h/d/dM/MY)>
<ScheduleEntry: task_weekend tasks.my_regular_task() <crontab: 0-5 10,12 sat,sun * * (m/h/d/dM/MY)>
<ScheduleEntry: celery.backend_cleanup celery.backend_cleanup() <crontab: 0 4 * * …
Run Code Online (Sandbox Code Playgroud) 我有一个部署到Heroku的Django应用程序,其中一个工作进程运行芹菜(+ celerycam用于监控).我使用RedisToGo的Redis数据库作为经纪人.我注意到Redis的内存不足.
这是我的procfile的样子:
web: python app/manage.py run_gunicorn -b "0.0.0.0:$PORT" -w 3
worker: python lipo/manage.py celerycam & python app/manage.py celeryd -E -B --loglevel=INFO
Run Code Online (Sandbox Code Playgroud)
这是KEYS'*'的输出:
celeryev.643a99be-74e8-44e1-8c67-fdd9891a5326
正在填写这些消息:
{"sw_sys": "Linux", "clock": 1, "timestamp": 1325914922.206671, "hostname": "064d9ffe-94a3-4a4e-b0c2-be9a85880c74", "type": "worker-online", "sw_ident": "celeryd", "sw_ver": "2.4.5"}
Run Code Online (Sandbox Code Playgroud)
知道如何定期清除这些消息吗?
我正在使用Celery 3.1.16经纪人(运行RabbitMQ)和多名芹菜工人与celeryd通过主管守护.问题在于任务更新.当我更新我的tasks.py文件时,芹菜工作者运行旧代码.
芹菜发射命令:
/home/my_project/bin/celery -B --autoreload --app=my_app.celery:app worker --loglevel=INFO
Run Code Online (Sandbox Code Playgroud)
我在django settings.py中包含任务文件:
CELERY_IMPORTS = [
'my_app.tasks'
]
Run Code Online (Sandbox Code Playgroud)
pyinotify安装并工作(我猜是这样),芹菜日志的一部分:
[2014-12-16 20:56:00,016: INFO/MainProcess] Task my_app.tasks.periodic_update_task_statistic[175c2557-7c07-43c3-ac70-f4e115344134] succeeded in 0.00816309102811s: 'ok!'
[2014-12-16 20:56:11,157: INFO/MainProcess] Detected modified modules: ['my_app.tasks']
[2014-12-16 20:57:00,001: INFO/Beat] Scheduler: Sending due task my_app.tasks.periodic_update_task_statistic (my_app.tasks.periodic_update_task_statistic)
[2014-12-16 20:57:00,007: INFO/MainProcess] Received task: my_app.tasks.periodic_update_task_statistic[f22998a9-dcb4-4c29-8086-86dd6e57eae1]
Run Code Online (Sandbox Code Playgroud)
所以,我的问题是:如果修改了celery更新并应用新任务代码?
我正在使用Celery 4.0.1
,Django 1.10
我有麻烦调度任务(运行任务工作正常).这是芹菜配置:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.conf.BROKER_URL = 'amqp://{}:{}@{}'.format(settings.AMQP_USER, settings.AMQP_PASSWORD, settings.AMQP_HOST)
app.conf.CELERY_DEFAULT_EXCHANGE = 'myapp.celery'
app.conf.CELERY_DEFAULT_QUEUE = 'myapp.celery_default'
app.conf.CELERY_TASK_SERIALIZER = 'json'
app.conf.CELERY_ACCEPT_CONTENT = ['json']
app.conf.CELERY_IGNORE_RESULT = True
app.conf.CELERY_DISABLE_RATE_LIMITS = True
app.conf.BROKER_POOL_LIMIT = 2
app.conf.CELERY_QUEUES = (
Queue('myapp.celery_default'),
Queue('myapp.queue1'),
Queue('myapp.queue2'),
Queue('myapp.queue3'),
)
Run Code Online (Sandbox Code Playgroud)
然后在tasks.py中我有:
@app.task(queue='myapp.queue1')
def my_task(some_id):
print("Doing something with", some_id)
Run Code Online (Sandbox Code Playgroud)
在views.py中,我想安排此任务:
def my_view(request, id):
app.add_periodic_task(10, my_task.s(id))
Run Code Online (Sandbox Code Playgroud)
然后我执行命令:
sudo systemctl start rabbitmq.service
celery -A myapp.celery_app beat -l debug
celery worker -A myapp.celery_app
Run Code Online (Sandbox Code Playgroud)
但是这项任务从未安排过.我在日志中看不到任何内容.这项任务正在起作用,因为如果在我看来我做了:
def my_view(request, …
Run Code Online (Sandbox Code Playgroud)