我使用RedisToGo Nano插件在Heroku上使用celerybeat
有一个网络dyno和一个工人dyno
celerybeat工作人员每分钟都要执行一项任务.
问题是:每当我部署新的提交时,dynos重启,我就会收到此错误
2014-02-27T13:19:31.552352+00:00 app[worker.1]: Traceback (most recent call last):
2014-02-27T13:19:31.552352+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/worker/consumer.py", line 389, in start
2014-02-27T13:19:31.552352+00:00 app[worker.1]: self.reset_connection()
2014-02-27T13:19:31.552352+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/worker/consumer.py", line 727, in reset_connection
2014-02-27T13:19:31.552352+00:00 app[worker.1]: self.connection = self._open_connection()
2014-02-27T13:19:31.552352+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/worker/consumer.py", line 792, in _open_connection
2014-02-27T13:19:31.552352+00:00 app[worker.1]: callback=self.maybe_shutdown)
2014-02-27T13:18:23.864287+00:00 app[worker.1]: self.on_connect()
2014-02-27T13:18:23.864287+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/redis/connection.py", line 263, in on_connect
2014-02-27T13:18:23.864287+00:00 app[worker.1]: if nativestr(self.read_response()) != 'OK':
2014-02-27T13:18:23.864287+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/redis/connection.py", line 314, in read_response
2014-02-27T13:18:23.864287+00:00 app[worker.1]: raise response
2014-02-27T13:18:23.864287+00:00 app[worker.1]: ResponseError: max …Run Code Online (Sandbox Code Playgroud) 有没有办法告诉celerybeat在特定任务运行时更改设置?
在这个例子中最好地说明了这种效用:
我有一个定期任务,每隔30秒检查一次值.有时,基于外部触发器(我无法预测),我希望此任务将轮询频率提高到10秒 - 持续几分钟.
这是可行的吗?我知道我可以更改任务配置并重新加载芹菜,但这似乎是一种混乱的做事方式......
所以我用芹菜django.rabbitmq是经纪人.redis是缓存.mysql是db.(localhost中的所有内容)
在一个新的终端,我跑
python manage.py runserver
Run Code Online (Sandbox Code Playgroud)在一个新的终端我开始像这样的芹菜
celery -A ds_django worker -B -l warning
Run Code Online (Sandbox Code Playgroud)这个程序过去常常有效.但现在当我运行celery命令时,我得到以下内容
[2016-07-12 09:15:20,113: CRITICAL/MainProcess] Frequent restarts detected: RestartFreqExceeded('5 in 1s',)
Traceback (most recent call last):
File "/Users/user/Desktop/ds-django/ds_django/newDs/lib/python2.7/site- packages/celery/worker/consumer.py", line 285, in start
self._restart_state.step()
File "/Users/user/Desktop/ds-django/ds_django/newDs/lib/python2.7/site-packages/billiard/common.py", line 130, in step
raise self.RestartFreqExceeded("%r in %rs" % (R, self.maxT))
RestartFreqExceeded: 5 in 1s
Run Code Online (Sandbox Code Playgroud)
我完全不知道为什么会发生这种情况或如何解决这个问题.我一遍又一遍地搜索Google和StackOverFlow,但没有用.
我认为可能发生这种情况的一个原因是,有时我曾经一次经营过4名芹菜工人,然后将他们全部关闭.也许是因为它说重启频率超标,但不确定.而且不知道如何解决这个问题.可能是一些配置文件设置,但哪个设置和哪里也没有任何线索.
如果需要任何其他信息,我会提供.任何帮助表示赞赏.谢谢.
有没有办法启动芹菜工人并在一个命令中击败?我想在使用Fabric的自动部署过程中添加芹菜.
我目前正在运行:
celery -A prj worker -B
Run Code Online (Sandbox Code Playgroud)
其次是
celery -A prj beat -l info -S django
Run Code Online (Sandbox Code Playgroud)
但是,第一个命令启动worker,并且不允许运行下一个命令(启动节拍),因为出现了worker启动消息.
有没有办法避免出现启动消息?或者在一个命令中执行这两个操作?也许有一种方法可以从我的Django配置启动这些?
谢谢!
Django 2.1.1、Django 频道 2.1.3、芹菜 4.2.1
我已经在 Celery 中设置了一个任务,在任务结束时,我需要向客户端发送一个 websocket 消息。但是,永远不会发送 websocket 消息。没有抛出任何错误,它只是不发送。
我已经设置了一个使用 Redis 作为后端的通道层。从普通的 Django 视图执行此操作可以正常工作。但是当在 Celery 任务中运行时,它会将消息发送到 Channels,我可以看到 Channels 确实运行了下面我的consumers.py 代码中显示的代码,但客户端从未收到 websocket 消息。
任务.py
def import_job(self):
# (do task calculations, store in data dict)
message = {'type': 'send_my_data',
'data': json.dumps(thecalcs) }
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)('core-data', message)
Run Code Online (Sandbox Code Playgroud)
消费者.py
class AsyncDataConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.channel_group_name = 'core-data'
# Join the group
await self.channel_layer.group_add(
self.channel_group_name,
self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
# Leave the group
await self.channel_layer.group_discard(
self.channel_group_name,
self.channel_name …Run Code Online (Sandbox Code Playgroud) 我在 django 项目中处理 celery beat 任务,该项目定期创建数据库条目。我知道是因为当我像这样设置任务时:
芹菜.py:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab
app = Celery("clock-backend", broker=os.environ.get("RABBITMQ_URL"))
app.config_from_object("django.conf:settings", namespace="CELERY")
app.conf.beat_schedule = {
'create_reports_monthly': {
'task': 'project_celery.tasks.create_reports_monthly',
'schedule': 10.0,
},
}
app.autodiscover_tasks()
Run Code Online (Sandbox Code Playgroud)
并开始我的项目,它真的每 10 秒创建一个对象。
但我真正想做的是将它设置为每个月的第一天运行。
这样做我会改变"schedule": crontab(0, 0, day_of_month="1")。
我的实际问题来了:我如何测试这是否真的有效?
通过测试,我指的是实际(单元)测试。
我尝试过的是使用名为freezegun的包。一个这样的测试看起来像这样:
def test_start_of_month_report_creation(self, user_object, contract_object, report_object):
# set time to the last day of January
with freeze_time("2019-01-31 23:59:59") as frozen_time:
# let one second pass
frozen_time.tick() …Run Code Online (Sandbox Code Playgroud) 我有 celery beat 的计划任务每 3 小时运行一次:
'sync_stuff': {
'task': 'celery_tasks.sync_stuff',
'schedule': crontab(hour='*/3')
}
Run Code Online (Sandbox Code Playgroud)
有时完成任务需要超过 3 个小时,我想确保 celery 不会在旧实例仍在运行时再次安排和运行任务。
有没有办法只用 celery 或 celerybeat 设置来做到这一点?
我正在尝试使用 celerybeat 和 docker 为我的 Flask 应用程序运行定期 celery 任务。但是,当我运行容器时,出现以下错误:
Removing corrupted schedule file 'celerybeat-schedule': error(22, 'Invalid argument')
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'scheduler'
Run Code Online (Sandbox Code Playgroud)
我在我的内部定义了我的节拍调度程序,settings.py如下所示:
CELERYBEAT_SCHEDULE = {
'fetch-expensify-reports': {
'task': 'canopact.blueprints.carbon.tasks.fetch_reports',
'schedule': 10.0
}
}
Run Code Online (Sandbox Code Playgroud)
此配置被传递到create_celery_app我的文件中的函数中app.py:
def create_celery_app(app=None):
"""
Create a new Celery object and tie together the Celery config to the app's
config. Wrap all tasks in the context of the application.
:param app: …Run Code Online (Sandbox Code Playgroud) 经过相当多的尝试和错误并一步步尝试找到解决方案后,我想我在这里分享问题并根据我发现的内容自己回答它们。除了一些零碎的内容之外,没有太多关于此的文档,这希望将来能对其他人有所帮助。
请注意,这是特定于Django、Celery、Redis和Digital Ocean App Platform的。
这主要是关于以下错误和进一步产生的影响:
OSError:[Errno 38]功能未实现
和
无法连接到redis://......
celery -A your_app worker --beat -l info
当您尝试在应用程序平台上运行 celery 命令或类似命令时,会发生第一个错误。目前数字海洋似乎不支持此功能。当您犯了许多潜在错误时,就会出现第二个错误。
情况
我有一个芹菜任务,我在不同的时区为每个客户运行。
基本上,对于数据库中的每个客户,我都会获取时区,然后以这种方式设置 celery 任务。
'schedule': crontab(minute=30, hour=14, nowfun=self.now_function)
Run Code Online (Sandbox Code Playgroud)
基本上,我想要的是在客户时区 14:30 运行的任务。因此就有了 now_function。
我的now_function只是获取客户时区的当前时间。
def now_function(self):
"""
return the now function for the task
this is used to compute the time a task should be scheduled for a given customer
"""
return datetime.now(timezone(self.customer.c_timezone))
Run Code Online (Sandbox Code Playgroud)
怎么了
我的任务运行时间不一致,有时它们在预期时间运行,所以假设客户时区是 14:30,如果时区是America/Chicago20:30,那么这是我的预期行为。
有时,它在 14:30 运行,这正是 UTC 时间。
我正在跟踪任务在正确时间运行的当天和卡片在错误时间运行的当天是否存在某种模式。
附加信息
我已在 celery 4.4.2 和 5.xx 上尝试过此操作,但它仍然具有相同的行为。
这是我的芹菜配置。
CELERY_REDIS_SCHEDULER_URL = redis_instance_url
logger.debug("****** CELERY_REDIS_SCHEDULER_URL: ", CELERY_REDIS_SCHEDULER_URL)
logger.debug("****** environment: ", environment)
redbeat_redis_url = CELERY_REDIS_SCHEDULER_URL
broker_url = …Run Code Online (Sandbox Code Playgroud)