我想安排每天在 16 点 UTC 运行两个任务。
为此,我实现了这个 celery 配置:
from celery.schedules import crontab
CELERY_IMPORTS = ('api.tasks')
CELERY_TASK_RESULT_EXPIRES = 30
CELERY_TIMEZONE = 'UTC'
CELERYBEAT_SCHEDULE = {
'book-task': {
'task': 'api.tasks.get_data',
# At 16h UTC everyday
'schedule': crontab(hour=16),
'args': ({'book'}),
},
'pencils-task': {
'task': 'api.tasks.get_data',
# At 16h UTC everyday
'schedule': crontab(hour=16),
'args': ({'pencil'}),
}
}
Run Code Online (Sandbox Code Playgroud)
我celery worker -A app.celery --loglevel=info --pool=solo
跑完后跑去跑芹菜工人celery beat -A app.celery
在运行启动芹菜节拍
通过上面的配置,我有两个任务从 UTC 16 点开始每分钟运行一次。我的配置有什么问题以及如何修复?
我正在使用 Redis 分布式锁来确保某些 celery 任务不会同时运行。为了使用 redis 管理锁,我使用 python redis库版本4.0.0(它指定任何死锁问题都应该由程序员处理)。
由于我想避免使用锁的超时参数,因为任务执行时间变化很大,所以我想知道是否有一种方法可以验证活动锁是否确实由进程拥有,并且它具有没有因为应用程序突然崩溃而无法释放而被阻止。
@shared_task(
name='test_task',
queue="test"
)
def test(value):
lock_acquired = False
try:
lock = redis_cli.lock('test_lock', sleep=1)
lock_acquired = lock.acquire(blocking=False)
if lock_acquired:
print(f"HELLO FROM: {value}")
sleep(15)
else:
print(f"LOCKED")
except Exception as e:
print(str(e))
finally:
if lock_acquired:
lock.release()
Run Code Online (Sandbox Code Playgroud)
就上面的代码而言,如果应用程序在 sleep (15)期间意外崩溃,则该锁仍将在下次执行时被锁定,即使获取它的进程不再存在。我怎样才能防止这种情况发生?
蟒蛇版本:3.6.8
Redis服务器版本:Redis服务器v=4.0.9 sha=00000000:0 malloc=jemalloc-3.6.0 位=64 build=9435c3c2879311f3
芹菜==5.1.1
我正在使用Python,Django和MySql开发Web应用程序.我在应用程序中有一个规定,用户可以上传*.wmv和*.mov文件,系统将处理这些文件并将其转换为*.mp4.我使用的是单一服务器架构,但随着用户群的增长,视频转换占用了90%的内存.我正在考虑使用Amazon EC2在云上移动视频转换/流媒体服务器,以便使用Redis和Celery在不同的服务器上进行转换.我想知道Redis - Celery是否支持多层架构.如果有人成功实现了这一目标,请告诉我相应的步骤.这将是真正有用的,因为我谷歌搜索,但找不到任何支持文档.
将芹菜文档描述了如何通过位置参数,以您的节拍计划任务列表或元组.
我有一个任务,只需一个参数,一个整数列表:
@shared_task
def schedule_by_ids(ids):
...
Run Code Online (Sandbox Code Playgroud)
我的celerybeat时间表如下:
CELERYBEAT_SCHEDULE = {
'schedule_by_ids': {
'task': 'myproj.app.tasks.schedule_by_ids',
'schedule': crontab(minute='*/10', hour='8-21'),
'args': ([1,]),
},
}
Run Code Online (Sandbox Code Playgroud)
我的任务失败,"int不可迭代" TypeError
.根据我的显示器(芹菜花),args传递为[1]
.
当我将args作为列表时,例如[[1]]
,arg显示在监视器中,[[1]]
并且它工作正常.
我的问题是:当它是一个元组时,它是如何通过args的?为什么?
我正在建立一个云系统,我有两个应用程序,包括完整功能的服务器应用程序,以及仅包含输入法的客户端应用程序,所以我在客户分支中安装客户端应用程序作为本地应用程序,
我希望在本地保存模型后覆盖应用程序中的任何模型,我将调用芹菜任务将此模型添加到队列中以确保它将到达,即使互联网已关闭,我将重试直到互联网起步,
现在我希望最佳实践能够以通用的方式对任何模型进行操作
我有两个选择
1-这样的覆盖保存方法
def save(self, *args, **kwargs):
super(Model, self).save(*args, **kwargs)
save_task.delay(self)
Run Code Online (Sandbox Code Playgroud)
或使用这样的信号
post_save.connect(save-task.delay, sender=Model)
Run Code Online (Sandbox Code Playgroud)
哪一个是最佳实践,我可以为这个项目的所有模型制作泛型?
我在tasks.py中有以下内容,指定json应该用作Celery的默认序列化程序.
celery = Celery('app', broker = 'redis://localhost:6379/4')
from kombu import serialization
serialization.registry._decoders.pop("application/x-python-serialize")
celery.conf.update(
CELERY_TASK_SERIALIZER = 'json',
CELERY_RESULT_BACKEND = 'redis://localhost:6379/4',
CELERY_ACCEPT_CONTENT = ['json'],
)
Run Code Online (Sandbox Code Playgroud)
此外,在调用任务时,我将json序列化器指定为:
r = t1.apply_async(kwargs = {'msg': msg}, serializer = 'json')
r = t1.wait()
Run Code Online (Sandbox Code Playgroud)
但是我仍然在从该行t1.wait()
开始的回溯调用行中得到以下错误.
ContentDisallowed:拒绝反序列化pickle类型的不可信内容(application/x-python-serialize)
任务t1引发了不同类型的异常,但我确实处理它们并尝试返回一个可以辨认的值.
我正在使用Celery 3.1.17
(Cipater)和Flask0.10.1
那究竟是什么导致了这个错误呢?如果需要更多信息,请告诉我.
谢谢 :)
我在CentOS 6.5上使用Celery 3.1.18和Python 2.7.8.
在Celery任务模块中,我有以下代码:
# someapp/tasks.py
from celery import shared_task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@shared_task()
def foo():
logger.info('Test output: %s', u"???")
Run Code Online (Sandbox Code Playgroud)
我在这里使用initd脚本来运行Celery worker.我还将以下设置放入/etc/default/celeryd
:
CELERYD_NODES="bar"
# %N will be replaced with the first part of the nodename.
CELERYD_LOG_FILE="/var/log/celery/%N.log"
# Workers should run as an unprivileged user.
# You need to create this user manually (or you can choose
# a user/group combination that already exists, e.g. nobody).
CELERYD_USER="nobody"
CELERYD_GROUP="nobody"
Run Code Online (Sandbox Code Playgroud)
所以我的日志文件位于/var/log/celery/bar.log …
我正在使用rest api发送推送通知.文件在这里. 我正在使用金字塔并使用芹菜安排这些推送通知.
这是我的代码示例:
result = urllib2.urlopen(urlRequest, headers={
"X-Parse-Application-Id": settings["parse.application.id"],
"X-Parse-REST-API-Key": settings["parse.restapi.key"],
"Content-Type": "application/json"
})
connection = httplib.HTTPSConnection('api.parse.com', 443)
connection.connect()
connection.request('POST', '/1/push', json.dumps(data), )
result = json.loads(connection.getresponse().read())
Run Code Online (Sandbox Code Playgroud)
但是芹菜记录了这个错误:
2015-08-18 16:39:45,092 INFO [celery.worker.strategy][MainThread] Received task: app_v1_1.tasks.push_notification[877906d8-1ea7-4b1f-8a54-aa61bffb40e8]
2015-08-18 16:39:45,094 ERROR [celery.worker.job][MainThread] Task app_v1_1.tasks.push_notification[877906d8-1ea7-4b1f-8a54-aa61bffb40e8] raised unexpected: TypeError("urlopen() got an unexpected keyword argument 'headers'",)
Traceback (most recent call last):
File "/home/apnistreet/work/ve/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "/home/comp/work/ve/local/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
return self.run(*args, **kwargs)
File "/home/comp/work/site/code/apnistreet_v1_1/tasks.py", line …
Run Code Online (Sandbox Code Playgroud) 我正在尝试在heroku上部署Flask应用程序,该应用程序在Celery中使用后台任务。我已经实现了应用程序工厂模式,以便将celery进程不绑定到flask应用程序的任何一个实例。
这在本地有效,但我还没有看到错误。但是,当部署到heroku时,总是会出现相同的结果:celery任务(我只使用一个)在第一次运行时成功,但是对该任务的所有后续celery调用均失败sqlalchemy.exc.DatabaseError: (psycopg2.DatabaseError) SSL error: decryption failed or bad record mac
。如果我重新启动芹菜工人,循环将继续。
有多个 问题表明此错误,但没有一个问题指定正确的解决方案。最初,我认为实现应用程序工厂模式将阻止此错误的出现,但还不完全如此。
在app/__init__.py
我创建celery和db对象中:
celery = Celery(__name__, broker=Config.CELERY_BROKER_URL)
db = SQLAlchemy()
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
db.init_app(app)
return app
Run Code Online (Sandbox Code Playgroud)
我的flask_celery.py
文件创建了实际的Flask应用程序对象:
import os
from app import celery, create_app
app = create_app(os.getenv('FLASK_CONFIG', 'default'))
app.app_context().push()
Run Code Online (Sandbox Code Playgroud)
我用以下命令开始芹菜:
celery worker -A app.flask_celery.celery --loglevel=info
实际的芹菜任务如下所示:
@celery.task()
def task_process_stuff(stuff_id):
stuff = Stuff.query.get(stuff_id)
stuff.processed = True
db.session.add(stuff)
db.session.commit()
return stuff
Run Code Online (Sandbox Code Playgroud)
通过以下方式调用:
task_process_stuff.apply_async(args=[stuff.id], countdown=10)
Run Code Online (Sandbox Code Playgroud)
图书馆版本
我在我的Django应用程序中遇到一个小问题而无法弄清楚它有什么问题.我有一个从前端成功发送的POST请求,django控制台也会点击URL并提供状态代码200.但问题是该视图根本没有被触发.
/urls.py
urlpatterns = [
url('addition/', views.addition_task, name='addition'),
url('addition-task-status/', views.addition_task_status, name='addition_task_status'),
url('', views.algorithm_index, name='algorithm_index'),
url('outlier/', views.run_outlier_task, name='run_outlier'),
url('outlier-task-status/', views.outlier_task_status, name='outlier_task_status'),
]
Run Code Online (Sandbox Code Playgroud)
/views.py
@csrf_exempt
def run_outlier_task(request):
print("I'm here")
if request.method == "POST":
print("Request is post")
metric = request.POST["metric_variable"]
print(metric)
path = ['MKT', 'CP_MANUFACTURER', 'CP_FRANCHISE', 'CP_BRAND', 'CP_SUBBRAND']
drivers = ['Cumulative_Distribution_Pts', 'pct_Stores_Selling', 'Baseline_RASP_per_EQ']
if request.session.get('file_path', None) == None:
file_name = "anon_cntr_out_br.csv"
else:
file_name = request.session.get('file_path', None)
outlier_task = outlier_algorithm.delay(path, metric, file_name, drivers)
return HttpResponseRedirect(reverse("outlier_task_status") + "?job_id=" + outlier_task.id)
else:
return HttpResponse("GET Request")
def …
Run Code Online (Sandbox Code Playgroud)