标签: celery

安排任务每天在特定时间运行一次 | 芹菜

我想安排每天在 16 点 UTC 运行两个任务。

为此,我实现了这个 celery 配置:

from celery.schedules import crontab


CELERY_IMPORTS = ('api.tasks')
CELERY_TASK_RESULT_EXPIRES = 30
CELERY_TIMEZONE = 'UTC'

CELERYBEAT_SCHEDULE = {
    'book-task': {
        'task': 'api.tasks.get_data',
        # At 16h UTC everyday
        'schedule': crontab(hour=16),
        'args': ({'book'}),
    },
    'pencils-task': {
        'task': 'api.tasks.get_data',
        # At 16h UTC everyday
        'schedule': crontab(hour=16),
        'args': ({'pencil'}),
    }
}

Run Code Online (Sandbox Code Playgroud)

celery worker -A app.celery --loglevel=info --pool=solo 跑完后跑去跑芹菜工人celery beat -A app.celery 在运行启动芹菜节拍

通过上面的配置,我有两个任务从 UTC 16 点开始每分钟运行一次。我的配置有什么问题以及如何修复?

python config celery celerybeat

1
推荐指数
1
解决办法
2990
查看次数

如果进程死掉,释放redis锁

我正在使用 Redis 分布式锁来确保某些 celery 任务不会同时运行。为了使用 redis 管理锁,我使用 python redis库版本4.0.0(它指定任何死锁问题都应该由程序员处理)。

由于我想避免使用锁的超时参数,因为任务执行时间变化很大,所以我想知道是否有一种方法可以验证活动锁是否确实由进程拥有,并且它具有没有因为应用程序突然崩溃而无法释放而被阻止。

@shared_task(
    name='test_task',
    queue="test"
)
def test(value):
    lock_acquired = False
    try:
        lock = redis_cli.lock('test_lock', sleep=1)
        lock_acquired = lock.acquire(blocking=False)        
        if lock_acquired:
            print(f"HELLO FROM: {value}")
            sleep(15)
        else:
            print(f"LOCKED")
    except Exception as e:
        print(str(e)) 
    finally:
        if lock_acquired:
            lock.release()
Run Code Online (Sandbox Code Playgroud)

就上面的代码而言,如果应用程序在 sleep (15)期间意外崩溃,则该锁仍将在下次执行时被锁定,即使获取它的进程不再存在。我怎样才能防止这种情况发生?

蟒蛇版本:3.6.8

Redis服务器版本:Redis服务器v=4.0.9 sha=00000000:0 malloc=jemalloc-3.6.0 位=64 build=9435c3c2879311f3

芹菜==5.1.1

python concurrency locking redis celery

1
推荐指数
1
解决办法
1374
查看次数

Redis - 在Amazon EC2上的Celery配置

我正在使用Python,Django和MySql开发Web应用程序.我在应用程序中有一个规定,用户可以上传*.wmv和*.mov文件,系统将处理这些文件并将其转换为*.mp4.我使用的是单一服务器架构,但随着用户群的增长,视频转换占用了90%的内存.我正在考虑使用Amazon EC2在云上移动视频转换/流媒体服务器,以便使用Redis和Celery在不同的服务器上进行转换.我想知道Redis - Celery是否支持多层架构.如果有人成功实现了这一目标,请告诉我相应的步骤.这将是真正有用的,因为我谷歌搜索,但找不到任何支持文档.

python django amazon-ec2 redis celery

0
推荐指数
1
解决办法
2002
查看次数

芹菜击败args:列表与元组

芹菜文档描述了如何通过位置参数,以您的节拍计划任务列表或元组.

我有一个任务,只需一个参数,一个整数列表:

@shared_task
def schedule_by_ids(ids):
  ...
Run Code Online (Sandbox Code Playgroud)

我的celerybeat时间表如下:

CELERYBEAT_SCHEDULE = {
    'schedule_by_ids': {
        'task': 'myproj.app.tasks.schedule_by_ids',
        'schedule': crontab(minute='*/10', hour='8-21'),
        'args': ([1,]),
    },
}
Run Code Online (Sandbox Code Playgroud)

我的任务失败,"int不可迭代" TypeError.根据我的显示器(芹菜花),args传递为[1].

当我将args作为列表时,例如[[1]],arg显示在监视器中,[[1]]并且它工作正常.

我的问题是:当它是一个元组时,它是如何通过args的?为什么?

python celery django-celery celerybeat

0
推荐指数
1
解决办法
1449
查看次数

使用celery将django模型中的保存方法重写为异步的最佳实践

我正在建立一个云系统,我有两个应用程序,包括完整功能的服务器应用程序,以及仅包含输入法的客户端应用程序,所以我在客户分支中安装客户端应用程序作为本地应用程序,

我希望在本地保存模型后覆盖应用程序中的任何模型,我将调用芹菜任务将此模型添加到队列中以确保它将到达,即使互联网已关闭,我将重试直到互联网起步,

现在我希望最佳实践能够以通用的方式对任何模型进行操作

我有两个选择

1-这样的覆盖保存方法

def save(self, *args, **kwargs):
    super(Model, self).save(*args, **kwargs)
    save_task.delay(self)
Run Code Online (Sandbox Code Playgroud)

或使用这样的信号

post_save.connect(save-task.delay, sender=Model)
Run Code Online (Sandbox Code Playgroud)

哪一个是最佳实践,我可以为这个项目的所有模型制作泛型?

django message-queue django-models celery django-celery

0
推荐指数
1
解决办法
1264
查看次数

尝试在Celery中强制执行JSON序列化程序时,ContentDisallowed有关pickle的错误

我在tasks.py中有以下内容,指定json应该用作Celery的默认序列化程序.

celery = Celery('app', broker = 'redis://localhost:6379/4')
from kombu import serialization
serialization.registry._decoders.pop("application/x-python-serialize")

celery.conf.update(
    CELERY_TASK_SERIALIZER = 'json',
    CELERY_RESULT_BACKEND  = 'redis://localhost:6379/4',
    CELERY_ACCEPT_CONTENT  = ['json'],
)
Run Code Online (Sandbox Code Playgroud)

此外,在调用任务时,我将json序列化器指定为:

r = t1.apply_async(kwargs = {'msg': msg}, serializer = 'json')
r = t1.wait()
Run Code Online (Sandbox Code Playgroud)

但是我仍然在从该行t1.wait()开始的回溯调用行中得到以下错误.

ContentDisallowed:拒绝反序列化pickle类型的不可信内容(application/x-python-serialize)

任务t1引发了不同类型的异常,但我确实处理它们并尝试返回一个可以辨认的值.

我正在使用Celery 3.1.17(Cipater)和Flask0.10.1

那究竟是什么导致了这个错误呢?如果需要更多信息,请告诉我.

谢谢 :)

serialization json pickle celery flask

0
推荐指数
1
解决办法
1435
查看次数

Celery worker的日志包含问号(???)而不是正确的unicode字符

我在CentOS 6.5上使用Celery 3.1.18和Python 2.7.8.

在Celery任务模块中,我有以下代码:

# someapp/tasks.py
from celery import shared_task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)


@shared_task()
def foo():
    logger.info('Test output: %s', u"???")
Run Code Online (Sandbox Code Playgroud)

在这里使用initd脚本来运行Celery worker.我还将以下设置放入/etc/default/celeryd:

CELERYD_NODES="bar"

# %N will be replaced with the first part of the nodename.
CELERYD_LOG_FILE="/var/log/celery/%N.log"

# Workers should run as an unprivileged user.
#   You need to create this user manually (or you can choose
#   a user/group combination that already exists, e.g. nobody).
CELERYD_USER="nobody"
CELERYD_GROUP="nobody"
Run Code Online (Sandbox Code Playgroud)

所以我的日志文件位于/var/log/celery/bar.log …

python unicode logging celery

0
推荐指数
1
解决办法
890
查看次数

TypeError:urlopen()得到一个意外的关键字参数'headers'

我正在使用rest api发送推送通知.文件在这里. 我正在使用金字塔并使用芹菜安排这些推送通知.

这是我的代码示例:

result = urllib2.urlopen(urlRequest, headers={
      "X-Parse-Application-Id": settings["parse.application.id"],
      "X-Parse-REST-API-Key": settings["parse.restapi.key"],
      "Content-Type": "application/json"
     })

connection = httplib.HTTPSConnection('api.parse.com', 443)
connection.connect()

connection.request('POST', '/1/push', json.dumps(data), )
result = json.loads(connection.getresponse().read())
Run Code Online (Sandbox Code Playgroud)

但是芹菜记录了这个错误:

2015-08-18 16:39:45,092 INFO  [celery.worker.strategy][MainThread] Received task: app_v1_1.tasks.push_notification[877906d8-1ea7-4b1f-8a54-aa61bffb40e8]
2015-08-18 16:39:45,094 ERROR [celery.worker.job][MainThread] Task app_v1_1.tasks.push_notification[877906d8-1ea7-4b1f-8a54-aa61bffb40e8] raised unexpected: TypeError("urlopen() got an unexpected keyword argument 'headers'",)
Traceback (most recent call last):
  File "/home/apnistreet/work/ve/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/comp/work/ve/local/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/comp/work/site/code/apnistreet_v1_1/tasks.py", line …
Run Code Online (Sandbox Code Playgroud)

python urllib2 celery urlopen pyramid

0
推荐指数
1
解决办法
9222
查看次数

Heroku上的烧瓶和芹菜:sqlalchemy.exc.DatabaseError:(psycopg2.DatabaseError)SSL错误:解密失败或错误的记录mac

我正在尝试在heroku上部署Flask应用程序,该应用程序在Celery中使用后台任务。我已经实现了应用程序工厂模式,以便将celery进程不绑定到flask应用程序的任何一个实例。

这在本地有效,但我还没有看到错误。但是,当部署到heroku时,总是会出现相同的结果:celery任务(我只使用一个)在第一次运行时成功,但是对该任务的所有后续celery调用均失败sqlalchemy.exc.DatabaseError: (psycopg2.DatabaseError) SSL error: decryption failed or bad record mac。如果我重新启动芹菜工人,循环将继续。

多个 问题表明此错误,但没有一个问题指定正确的解决方案。最初,我认为实现应用程序工厂模式将阻止此错误的出现,但还不完全如此。

app/__init__.py我创建celery和db对象中:

celery = Celery(__name__, broker=Config.CELERY_BROKER_URL)
db = SQLAlchemy()

def create_app(config_name):
    app = Flask(__name__)
    app.config.from_object(config[config_name])

    db.init_app(app)
    return app
Run Code Online (Sandbox Code Playgroud)

我的flask_celery.py文件创建了实际的Flask应用程序对象:

import os
from app import celery, create_app

app = create_app(os.getenv('FLASK_CONFIG', 'default'))
app.app_context().push()
Run Code Online (Sandbox Code Playgroud)

我用以下命令开始芹菜: celery worker -A app.flask_celery.celery --loglevel=info

实际的芹菜任务如下所示:

@celery.task()
def task_process_stuff(stuff_id):
    stuff = Stuff.query.get(stuff_id)
    stuff.processed = True
    db.session.add(stuff)
    db.session.commit()
    return stuff
Run Code Online (Sandbox Code Playgroud)

通过以下方式调用:

task_process_stuff.apply_async(args=[stuff.id], countdown=10)
Run Code Online (Sandbox Code Playgroud)

图书馆版本

  • 烧瓶0.12.2 …

python heroku celery flask flask-sqlalchemy

0
推荐指数
1
解决办法
676
查看次数

Django:POST请求成功但后端没有任何内容

我在我的Django应用程序中遇到一个小问题而无法弄清楚它有什么问题.我有一个从前端成功发送的POST请求,django控制台也会点击URL并提供状态代码200.但问题是该视图根本没有被触发.

/urls.py

urlpatterns = [
    url('addition/', views.addition_task, name='addition'),
    url('addition-task-status/', views.addition_task_status, name='addition_task_status'),
    url('', views.algorithm_index, name='algorithm_index'),
    url('outlier/', views.run_outlier_task, name='run_outlier'),
    url('outlier-task-status/', views.outlier_task_status, name='outlier_task_status'),
    ]
Run Code Online (Sandbox Code Playgroud)

/views.py

@csrf_exempt
def run_outlier_task(request):
    print("I'm here")

    if request.method == "POST":
        print("Request is post")
        metric = request.POST["metric_variable"]
        print(metric)
        path = ['MKT', 'CP_MANUFACTURER', 'CP_FRANCHISE', 'CP_BRAND', 'CP_SUBBRAND']
        drivers = ['Cumulative_Distribution_Pts', 'pct_Stores_Selling', 'Baseline_RASP_per_EQ']

        if request.session.get('file_path', None) == None:
            file_name = "anon_cntr_out_br.csv"
        else:
            file_name = request.session.get('file_path', None)

        outlier_task = outlier_algorithm.delay(path, metric, file_name, drivers)
        return HttpResponseRedirect(reverse("outlier_task_status") + "?job_id=" + outlier_task.id)
    else:
        return HttpResponse("GET Request")

def …
Run Code Online (Sandbox Code Playgroud)

python django celery

0
推荐指数
1
解决办法
79
查看次数