有没有办法告诉celerybeat在特定任务运行时更改设置?
在这个例子中最好地说明了这种效用:
我有一个定期任务,每隔30秒检查一次值.有时,基于外部触发器(我无法预测),我希望此任务将轮询频率提高到10秒 - 持续几分钟.
这是可行的吗?我知道我可以更改任务配置并重新加载芹菜,但这似乎是一种混乱的做事方式......
所以我用芹菜django.rabbitmq是经纪人.redis是缓存.mysql是db.(localhost中的所有内容)
在一个新的终端,我跑
python manage.py runserver
Run Code Online (Sandbox Code Playgroud)在一个新的终端我开始像这样的芹菜
celery -A ds_django worker -B -l warning
Run Code Online (Sandbox Code Playgroud)这个程序过去常常有效.但现在当我运行celery命令时,我得到以下内容
[2016-07-12 09:15:20,113: CRITICAL/MainProcess] Frequent restarts detected: RestartFreqExceeded('5 in 1s',)
Traceback (most recent call last):
File "/Users/user/Desktop/ds-django/ds_django/newDs/lib/python2.7/site- packages/celery/worker/consumer.py", line 285, in start
self._restart_state.step()
File "/Users/user/Desktop/ds-django/ds_django/newDs/lib/python2.7/site-packages/billiard/common.py", line 130, in step
raise self.RestartFreqExceeded("%r in %rs" % (R, self.maxT))
RestartFreqExceeded: 5 in 1s
Run Code Online (Sandbox Code Playgroud)
我完全不知道为什么会发生这种情况或如何解决这个问题.我一遍又一遍地搜索Google和StackOverFlow,但没有用.
我认为可能发生这种情况的一个原因是,有时我曾经一次经营过4名芹菜工人,然后将他们全部关闭.也许是因为它说重启频率超标,但不确定.而且不知道如何解决这个问题.可能是一些配置文件设置,但哪个设置和哪里也没有任何线索.
如果需要任何其他信息,我会提供.任何帮助表示赞赏.谢谢.
我有芹菜和芹菜(四名工人)批量做一些加工步骤.其中一项任务大致是这样的,"对于每个没有创建Y的X,创建Y."
该任务以半快速(10秒)定期运行.任务很快完成.还有其他任务正在进行中.
我多次遇到这个问题,其中节拍任务显然已经积压,因此同时执行相同的任务(来自不同的节拍时间),导致错误的重复工作.似乎任务也是无序执行的.
是否有可能限制芹菜节拍以确保一次只有一个突出的任务实例?设置类似于rate_limit=5任务的东西是"正确"的方式吗?
是否有可能确保按顺序执行节拍任务,例如,不是分派任务,节拍将其添加到任务链中?
处理这个问题的最佳方法是什么,除了使这些任务本身以原子方式执行并且可以安全地同时执行?这不是我对预期任务的限制......
任务本身是天真地定义的:
@periodic_task(run_every=timedelta(seconds=10))
def add_y_to_xs():
# Do things in a database
return
Run Code Online (Sandbox Code Playgroud)
这是一个实际的(清理过的)日志:
[00:00.000]foocorp.tasks.add_y_to_xs发送.ID - >#1[00:00.001] 收到的任务:foocorp.tasks.add_y_to_xs [#1][00:10.009]foocorp.tasks.add_y_to_xs发送.ID - >#2[00:20.024]foocorp.tasks.add_y_to_xs发送.ID - >#3[00:26.747] 收到的任务:foocorp.tasks.add_y_to_xs [#2][00:26.748] TaskPool:应用#2[00:26.752] 收到的任务:foocorp.tasks.add_y_to_xs [#3][00:26.769] 接受的任务:foocorp.tasks.add_y_to_xs [#2] pid:26528[00:26.775] 任务foocorp.tasks.add_y_to_xs [#2]成功完成0.0197986490093s:无[00:26.806] TaskPool:应用#1[00:26.836] TaskPool:应用#3[01:30.020] 接受的任务:foocorp.tasks.add_y_to_xs [#1] pid:26526[01:30.053] 接受的任务:foocorp.tasks.add_y_to_xs [#3] pid:26529[01:30.055] foocorp.tasks.add_y_to_xs [#1]:为X id添加Y#9725[01:30.070] foocorp.tasks.add_y_to_xs [#3]:为X id添加Y#9725[01:30.074] 任务foocorp.tasks.add_y_to_xs [#1]成功完成0.0594762689434s:无[01:30.087] 任务foocorp.tasks.add_y_to_xs [#3]成功完成0.0352867960464s:无我们目前正在使用带有RabbitMQ的Celery 3.1.4作为传输.
编辑丹,这是我想出的: …
我有3台机器与芹菜工人和rabbitmq作为经纪人,一名工人正在运行与击败旗,所有这一切都由主管管理,有时芹菜死亡有这样的错误.此错误仅出现在节拍工作人员身上,但当出现时,所有计算机上的工作人员都会死亡.(celery == 3.1.12,kombu == 3.0.20)
[2014-07-05 08:37:04,297: INFO/MainProcess] Connected to amqp://user:**@192.168.15.106:5672//
[2014-07-05 08:37:04,311: ERROR/Beat] Process Beat
Traceback (most recent call last):
File "/var/projects/env/local/lib/python2.7/site-packages/billiard/process.py", line 292, in _bootstrap
self.run()
File "/var/projects/env/local/lib/python2.7/site-packages/celery/beat.py", line 527, in run
self.service.start(embedded_process=True)
File "/var/projects/env/local/lib/python2.7/site-packages/celery/beat.py", line 453, in start
humanize_seconds(self.scheduler.max_interval))
File "/var/projects/env/local/lib/python2.7/site-packages/kombu/utils/__init__.py", line 322, in __get__
value = obj.__dict__[self.__name__] = self.__get(obj)
File "/var/projects/env/local/lib/python2.7/site-packages/celery/beat.py", line 491, in scheduler
return self.get_scheduler()
File "/var/projects/env/local/lib/python2.7/site-packages/celery/beat.py", line 486, in get_scheduler
lazy=lazy)
File "/var/projects/env/local/lib/python2.7/site-packages/celery/utils/imports.py", line 53, in instantiate
return symbol_by_name(name)(*args, **kwargs)
File "/var/projects/env/local/lib/python2.7/site-packages/celery/beat.py", line …Run Code Online (Sandbox Code Playgroud) 我试图将celerybeat作为Elastic beanstalk中的守护进程运行.这是我的配置文件:
files:
"/opt/python/log/django.log":
mode: "000666"
owner: ec2-user
group: ec2-user
content: |
# Log file
encoding: plain
"/opt/elasticbeanstalk/hooks/appdeploy/post/run_supervised_celeryd.sh":
mode: "000755"
owner: root
group: root
content: |
#!/usr/bin/env bash
# Get django environment variables
celeryenv=`cat /opt/python/current/env | tr '\n' ',' | sed 's/%/%%/g' | sed 's/export //g' | sed 's/$PATH/%(ENV_PATH)s/g' | sed 's/$PYTHONPATH//g' | sed 's/$LD_LIBRARY_PATH//g'`
celeryenv=${celeryenv%?}
# Create celery configuraiton script
celeryconf="[program:celeryd]
; Set full path to celery program if using virtualenv
command=/opt/python/run/venv/bin/celery worker -A avtotest --loglevel=INFO
directory=/opt/python/current/app
user=nobody
numprocs=1 …Run Code Online (Sandbox Code Playgroud) 我每 30 秒有一个计划的 celery 运行任务。我有一个每天作为任务运行,另一个每周在用户指定的时间和星期几运行。它检查“开始时间”和“下一个预定日期”。在任务完成之前,下一个计划日期不会更新。
但是,我想知道如何确保 celery beat 只运行一次任务。我现在看到,芹菜将多次运行某个任务,直到该任务的下一个预定日期更新。
I have created a celery worker with a single celerybeat schedule task which runs at 5 seconds time interval. How can I add another beat task dynamically to the celery worker without stopping it?
Example
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
CELERY_TIMEZONE = 'UTC',
CELERYBEAT_SCHEDULE = {
'long-run-5-secs': {
'task': 'test_proj.tasks.test',
'schedule': timedelta(seconds=5),
'args': (16, )
}
}
)
Run Code Online (Sandbox Code Playgroud)
With the above configuration, I am able to run the celery worker with beat mode successfully.
Now I need add the below beat schedule dynamically: …
我正在尝试学习如何使用 celery 在我的一个模型上每天检查日期。我的一个模型包含一个到期日期和一个布尔字段,表明他们的保险是否已过期。
模型很大,所以我要发布一个精简版。我想我有两个选择。要么在模型方法上运行 celery 任务,要么在我的 tasks.py 中重写该函数。然后我需要使用 Celery beat 来运行日程表来每天检查。
我有这个功能可以工作,但我直接传递了我认为错误的模型对象。
我也遇到了如何在 celery.py 中的 celery beat 调度程序中使用 args 的问题。
我真的很接近让这个工作,但我想我会以错误的方式执行任务。我认为在模型方法上执行任务可能是最干净的,我只是不确定如何完成它。
模型.py
class CarrierCompany(models.Model):
name = models.CharField(max_length=255, unique=True)
insurance_expiration = models.DateTimeField(null=True)
insurance_active = models.BooleanField()
def insurance_expiration_check(self):
if self.insurance_expiration > datetime.today().date():
self.insurance_active = True
self.save()
print("Insurance Active")
else:
self.insurance_active = False
self.save()
print("Insurance Inactive")
Run Code Online (Sandbox Code Playgroud)
任务.py
from __future__ import absolute_import, unicode_literals
from celery.decorators import task
from datetime import datetime, date
from django.utils import timezone
from .models import CarrierCompany
@task(name="insurance_expired")
def insurance_date():
carriers …Run Code Online (Sandbox Code Playgroud) 我每 15 分钟运行一次 celerybeat 调度程序,我需要从 API 获取数据(速率限制 = 300 个请求/分钟最大值)并将结果存储到数据库中。我想同时在速率限制下并行获取 url。如果任何工人在这里失败,我不想重试,因为我会在 15 分钟后再次 ping。关于如何在芹菜中实现这一点的任何建议。
@celery.task(bind=True)
def fetch_store(self):
start = time()
return c.chain(c.group(emap.s() for _ in range(2000)), ereduce.s(start)).apply_async()
@celery.task(rate_limit='300/m')
def fetch():
#... requests data from external API
return data
@celery.task
def store(numbers, start):
end = time()
logger.info("Received" + numbers + " " + (end - start)/1000 + "seconds")
Run Code Online (Sandbox Code Playgroud) 我正在使用芹菜 4.3.0。我正在尝试根据 json 文件中的时间表每 5 秒更新一次 celery beat 的时间表,以便当我手动编辑、添加或删除该 json 文件中的计划任务时,celery beat 会获取更改调度程序而无需重新启动它。
我尝试的是创建一个任务,通过更新app.conf['CELERYBEAT_SCHEDULE']. 该任务每 5 秒成功运行一次,但 celery beat 不会更新到新的计划,即使我设置beat_max_loop_interval为 1 秒。
任务.py
from celery import Celery
app = Celery("tasks", backend='redis://', broker='redis://')
app.config_from_object('celeryconfig')
@app.task
def hello_world():
return "Hello World!"
@app.task
def update_schedule():
with open("path_to_scheduler.json", "r") as f:
app.conf['CELERYBEAT_SCHEDULE'] = json.load(f)
Run Code Online (Sandbox Code Playgroud)
芹菜配置文件
beat_max_loop_interval = 1 # Maximum number of seconds beat can sleep between checking the schedule
beat_schedule = {
"greet-every-10-seconds": {
"task": "tasks.hello_world",
"schedule": 10.0
}, …Run Code Online (Sandbox Code Playgroud) celery ×10
celerybeat ×10
python ×8
django ×3
djcelery ×2
celery-task ×1
concurrency ×1
python-3.x ×1
rabbitmq ×1
scheduler ×1
supervisord ×1