标签: celerybeat

动态改变芹菜节拍设置

有没有办法告诉celerybeat在特定任务运行时更改设置?

在这个例子中最好地说明了这种效用:

我有一个定期任务,每隔30秒检查一次值.有时,基于外部触发器(我无法预测),我希望此任务将轮询频率提高到10秒 - 持续几分钟.

这是可行的吗?我知道我可以更改任务配置并重新加载芹菜,但这似乎是一种混乱的做事方式......

python scheduled-tasks celery celerybeat

7
推荐指数
1
解决办法
1833
查看次数

如何解决此错误?django + celery + rabbitmq + mysql + redis中的"RestartFreqExceeded:5 in 1s"

所以我用芹菜django.rabbitmq是经纪人.redis是缓存.mysql是db.(localhost中的所有内容)

  1. 我正在使用python2.7并使用基于virtualenv的虚拟环境
  2. 我在默认端口启动redis服务器(本地)
  3. 在一个新的终端,我跑

    python manage.py runserver
    
    Run Code Online (Sandbox Code Playgroud)
  4. 在一个新的终端我开始像这样的芹菜

    celery -A ds_django worker -B -l warning
    
    Run Code Online (Sandbox Code Playgroud)

这个程序过去常常有效.但现在当我运行celery命令时,我得到以下内容

    [2016-07-12 09:15:20,113: CRITICAL/MainProcess] Frequent restarts detected: RestartFreqExceeded('5 in 1s',)
    Traceback (most recent call last):
        File "/Users/user/Desktop/ds-django/ds_django/newDs/lib/python2.7/site- packages/celery/worker/consumer.py", line 285, in start
            self._restart_state.step()
        File "/Users/user/Desktop/ds-django/ds_django/newDs/lib/python2.7/site-packages/billiard/common.py", line 130, in step
            raise self.RestartFreqExceeded("%r in %rs" % (R, self.maxT))
     RestartFreqExceeded: 5 in 1s
Run Code Online (Sandbox Code Playgroud)

我完全不知道为什么会发生这种情况或如何解决这个问题.我一遍又一遍地搜索Google和StackOverFlow,但没有用.

我认为可能发生这种情况的一个原因是,有时我曾经一次经营过4名芹菜工人,然后将他们全部关闭.也许是因为它说重启频率超标,但不确定.而且不知道如何解决这个问题.可能是一些配置文件设置,但哪个设置和哪里也没有任何线索.

如果需要任何其他信息,我会提供.任何帮助表示赞赏.谢谢.

python celery django-celery celerybeat djcelery

7
推荐指数
1
解决办法
1793
查看次数

Celery Beat:一次限制为单个任务实例

我有芹菜和芹菜(四名工人)批量做一些加工步骤.其中一项任务大致是这样的,"对于每个没有创建Y的X,创建Y."

该任务以半快速(10秒)定期运行.任务很快完成.还有其他任务正在进行中.

我多次遇到这个问题,其中节拍任务显然已经积压,因此同时执行相同的任务(来自不同的节拍时间),导致错误的重复工作.似乎任务也是无序执行的.

  1. 是否有可能限制芹菜节拍以确保一次只有一个突出的任务实例?设置类似于rate_limit=5任务的东西是"正确"的方式吗?

  2. 是否有可能确保按顺序执行节拍任务,例如,不是分派任务,节拍将其添加到任务链中?

  3. 处理这个问题的最佳方法是什么,除了使这些任务本身以原子方式执行并且可以安全地同时执行?这不是我对预期任务的限制......

任务本身是天真地定义的:

@periodic_task(run_every=timedelta(seconds=10))
def add_y_to_xs():
    # Do things in a database
    return
Run Code Online (Sandbox Code Playgroud)

这是一个实际的(清理过的)日志:

  • [00:00.000]foocorp.tasks.add_y_to_xs发送.ID - >#1
  • [00:00.001] 收到的任务:foocorp.tasks.add_y_to_xs [#1]
  • [00:10.009]foocorp.tasks.add_y_to_xs发送.ID - >#2
  • [00:20.024]foocorp.tasks.add_y_to_xs发送.ID - >#3
  • [00:26.747] 收到的任务:foocorp.tasks.add_y_to_xs [#2]
  • [00:26.748] TaskPool:应用#2
  • [00:26.752] 收到的任务:foocorp.tasks.add_y_to_xs [#3]
  • [00:26.769] 接受的任务:foocorp.tasks.add_y_to_xs [#2] pid:26528
  • [00:26.775] 任务foocorp.tasks.add_y_to_xs [#2]成功完成0.0197986490093s:无
  • [00:26.806] TaskPool:应用#1
  • [00:26.836] TaskPool:应用#3
  • [01:30.020] 接受的任务:foocorp.tasks.add_y_to_xs [#1] pid:26526
  • [01:30.053] 接受的任务:foocorp.tasks.add_y_to_xs [#3] pid:26529
  • [01:30.055] foocorp.tasks.add_y_to_xs [#1]:为X id添加Y#9725
  • [01:30.070] foocorp.tasks.add_y_to_xs [#3]:为X id添加Y#9725
  • [01:30.074] 任务foocorp.tasks.add_y_to_xs [#1]成功完成0.0594762689434s:无
  • [01:30.087] 任务foocorp.tasks.add_y_to_xs [#3]成功完成0.0352867960464s:无

我们目前正在使用带有RabbitMQ的Celery 3.1.4作为传输.

编辑丹,这是我想出的: …

python concurrency rabbitmq celery celerybeat

6
推荐指数
2
解决办法
8074
查看次数

Celery死于DBPageNotFoundError

我有3台机器与芹菜工人和rabbitmq作为经纪人,一名工人正在运行与击败旗,所有这一切都由主管管理,有时芹菜死亡有这样的错误.此错误仅出现在节拍工作人员身上,但当出现时,所有计算机上的工作人员都会死亡.(celery == 3.1.12,kombu == 3.0.20)

[2014-07-05 08:37:04,297: INFO/MainProcess] Connected to amqp://user:**@192.168.15.106:5672//
[2014-07-05 08:37:04,311: ERROR/Beat] Process Beat
Traceback (most recent call last):
File "/var/projects/env/local/lib/python2.7/site-packages/billiard/process.py", line 292, in _bootstrap
self.run()
File "/var/projects/env/local/lib/python2.7/site-packages/celery/beat.py", line 527, in run
self.service.start(embedded_process=True)
File "/var/projects/env/local/lib/python2.7/site-packages/celery/beat.py", line 453, in start
humanize_seconds(self.scheduler.max_interval))
File "/var/projects/env/local/lib/python2.7/site-packages/kombu/utils/__init__.py", line 322, in __get__
value = obj.__dict__[self.__name__] = self.__get(obj)
File "/var/projects/env/local/lib/python2.7/site-packages/celery/beat.py", line 491, in scheduler
return self.get_scheduler()
File "/var/projects/env/local/lib/python2.7/site-packages/celery/beat.py", line 486, in get_scheduler
lazy=lazy)
File "/var/projects/env/local/lib/python2.7/site-packages/celery/utils/imports.py", line 53, in instantiate
return symbol_by_name(name)(*args, **kwargs)
File "/var/projects/env/local/lib/python2.7/site-packages/celery/beat.py", line …
Run Code Online (Sandbox Code Playgroud)

python celery supervisord celerybeat

6
推荐指数
1
解决办法
2977
查看次数

在Elastic Beanstalk(AWS)中使用Ceebee Beetbeat

我试图将celerybeat作为Elastic beanstalk中的守护进程运行.这是我的配置文件:

files:
"/opt/python/log/django.log":
mode: "000666"
owner: ec2-user
group: ec2-user
content: |
  # Log file
encoding: plain
"/opt/elasticbeanstalk/hooks/appdeploy/post/run_supervised_celeryd.sh":
mode: "000755"
owner: root
group: root
content: |
  #!/usr/bin/env bash
  # Get django environment variables
  celeryenv=`cat /opt/python/current/env | tr '\n' ',' | sed 's/%/%%/g' | sed 's/export //g' | sed 's/$PATH/%(ENV_PATH)s/g' | sed 's/$PYTHONPATH//g' | sed 's/$LD_LIBRARY_PATH//g'`
  celeryenv=${celeryenv%?}

  # Create celery configuraiton script
  celeryconf="[program:celeryd]
  ; Set full path to celery program if using virtualenv
  command=/opt/python/run/venv/bin/celery worker -A avtotest --loglevel=INFO

  directory=/opt/python/current/app
  user=nobody
  numprocs=1 …
Run Code Online (Sandbox Code Playgroud)

python django amazon-web-services celery celerybeat

6
推荐指数
1
解决办法
1351
查看次数

阻止 Celery Beat 运行相同的任务

我每 30 秒有一个计划的 celery 运行任务。我有一个每天作为任务运行,另一个每周在用户指定的时间和星期几运行。它检查“开始时间”和“下一个预定日期”。在任务完成之前,下一个计划日期不会更新。

但是,我想知道如何确保 celery beat 只运行一次任务。我现在看到,芹菜将多次运行某个任务,直到该任务的下一个预定日期更新。

python django celery django-celery celerybeat

6
推荐指数
1
解决办法
1996
查看次数

python celery - how to add CELERYBEAT_SCHEDULE task at runtime to a worker?

I have created a celery worker with a single celerybeat schedule task which runs at 5 seconds time interval. How can I add another beat task dynamically to the celery worker without stopping it?

Example

app.conf.update(
   CELERY_TASK_RESULT_EXPIRES=3600,
   CELERY_TIMEZONE = 'UTC',
   CELERYBEAT_SCHEDULE = {
    'long-run-5-secs': {
        'task': 'test_proj.tasks.test',
        'schedule': timedelta(seconds=5),
        'args': (16, )
    }
   }
)
Run Code Online (Sandbox Code Playgroud)

With the above configuration, I am able to run the celery worker with beat mode successfully.

Now I need add the below beat schedule dynamically: …

python scheduler celery celerybeat djcelery

6
推荐指数
1
解决办法
3547
查看次数

Django 模型上的 Celery 任务

我正在尝试学习如何使用 celery 在我的一个模型上每天检查日期。我的一个模型包含一个到期日期和一个布尔字段,表明他们的保险是否已过期。

模型很大,所以我要发布一个精简版。我想我有两个选择。要么在模型方法上运行 celery 任务,要么在我的 tasks.py 中重写该函数。然后我需要使用 Celery beat 来运行日程表来每天检查。

我有这个功能可以工作,但我直接传递了我认为错误的模型对象。

我也遇到了如何在 celery.py 中的 celery beat 调度程序中使用 args 的问题。

我真的很接近让这个工作,但我想我会以错误的方式执行任务。我认为在模型方法上执行任务可能是最干净的,我只是不确定如何完成它。

模型.py

class CarrierCompany(models.Model):
    name = models.CharField(max_length=255, unique=True)
    insurance_expiration = models.DateTimeField(null=True)
    insurance_active = models.BooleanField()

    def insurance_expiration_check(self):
        if self.insurance_expiration > datetime.today().date():
            self.insurance_active = True
            self.save()
            print("Insurance Active")
        else:
            self.insurance_active = False
            self.save()
            print("Insurance Inactive")
Run Code Online (Sandbox Code Playgroud)

任务.py

from __future__ import absolute_import, unicode_literals
from celery.decorators import task
from datetime import datetime, date
from django.utils import timezone
from .models import CarrierCompany



@task(name="insurance_expired")
def insurance_date():
    carriers …
Run Code Online (Sandbox Code Playgroud)

django celery celery-task celerybeat

6
推荐指数
1
解决办法
4849
查看次数

如何禁用芹菜中的重试?

我每 15 分钟运行一次 celerybeat 调度程序,我需要从 API 获取数据(速率限制 = 300 个请求/分钟最大值)并将结果存储到数据库中。我想同时在速率限制下并行获取 url。如果任何工人在这里失败,我不想重试,因为我会在 15 分钟后再次 ping。关于如何在芹菜中实现这一点的任何建议。

@celery.task(bind=True)
def fetch_store(self):
    start = time()
    return c.chain(c.group(emap.s() for _ in range(2000)), ereduce.s(start)).apply_async()

@celery.task(rate_limit='300/m')
def fetch():
    #... requests data from external API
    return data

@celery.task
def store(numbers, start):
    end = time()
    logger.info("Received" + numbers + " " + (end - start)/1000 + "seconds")
Run Code Online (Sandbox Code Playgroud)

celery python-3.x celerybeat

6
推荐指数
1
解决办法
566
查看次数

如何动态改变芹菜节拍的时间表?

我正在使用芹菜 4.3.0。我正在尝试根据 json 文件中的时间表每 5 秒更新一次 celery beat 的时间表,以便当我手动编辑、添加或删除该 json 文件中的计划任务时,celery beat 会获取更改调度程序而无需重新启动它。

我尝试的是创建一个任务,通过更新app.conf['CELERYBEAT_SCHEDULE']. 该任务每 5 秒成功运行一次,但 celery beat 不会更新到新的计划,即使我设置beat_max_loop_interval为 1 秒。

任务.py

from celery import Celery

app = Celery("tasks", backend='redis://', broker='redis://')
app.config_from_object('celeryconfig')

@app.task
def hello_world():
    return "Hello World!"

@app.task
def update_schedule():
    with open("path_to_scheduler.json", "r") as f:
        app.conf['CELERYBEAT_SCHEDULE'] = json.load(f)
Run Code Online (Sandbox Code Playgroud)

芹菜配置文件

beat_max_loop_interval = 1  # Maximum number of seconds beat can sleep between checking the schedule

beat_schedule = {
    "greet-every-10-seconds": {
        "task": "tasks.hello_world",
        "schedule": 10.0
    }, …
Run Code Online (Sandbox Code Playgroud)

python scheduled-tasks job-scheduling celery celerybeat

6
推荐指数
0
解决办法
2096
查看次数