标签: djcelery

如何正确配置djcelery结果后端到数据库

我正在尝试设置djangocelery来存储数据库中的任务结果.

我设置:

CELERY_RESULT_BACKEND = 'djcelery.backends.database.DatabaseBackend'
Run Code Online (Sandbox Code Playgroud)

然后我同步并迁移了db(没有错误).

Celery正在工作,任务得到处理(我可以得到结果),但是管理员显示没有任务.在数据库中有两个表celery_taskmetadjcelery_taskmeta.第一个是保持结果,第二个显示在管理员中.任何人都有洞察力如何正确配置它?

django celery djcelery

10
推荐指数
2
解决办法
1万
查看次数

Django/djcelery 1.8.2 AppRegistryNotReady:无法初始化翻译基础设施

我收到以下错误:

File "/Library/Python/2.7/site-packages/Django-1.8.2-py2.7.egg/django/utils/translation/trans_real.py", line 164, in _add_installed_apps_translations
"The translation infrastructure cannot be initialized before the "
django.core.exceptions.AppRegistryNotReady: The translation infrastructure cannot be initialized before the apps registry is ready. Check that you don't make non-lazy gettext calls at import time.
Run Code Online (Sandbox Code Playgroud)

我有一个项目,它不是一个django应用程序,而是一个芹菜应用程序.因此,我还没有创建一个wsgi.pymodels.py或通过创建典型的文件django-admin时,一个项目或应用程序已启动.

我只想使用djcelery能够使用djcelery.schedules.DatabaseScheduler此处指定的类似方法创建周期性任务在运行时添加,修改,删除celery.schedules和此处如何动态添加/删除周期性任务到Celery(celerybeat)

这里给出的问题的解决方案(AppRegistryNotReady,使用uWSGI部署时的转换错误)要求我对vassal.ini文件进行更改.我的实现中没有vassal.ini文件.

我将简要介绍一下我的项目 -

proj
  apps.py
  tasks.py
  celeryconfig.py
  runproj.py
Run Code Online (Sandbox Code Playgroud)
  • apps.py
    from celery import Celery
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryconfig')
    myapp = Celery('myapp')
    myapp.config_from_object('celeryconfig')
    if __name__ == '__main__' …
Run Code Online (Sandbox Code Playgroud)

python django celery celerybeat djcelery

8
推荐指数
1
解决办法
748
查看次数

芹菜与djcelery

在尝试在我的django项目上设置芹菜时,我对这两个应用程序之间的差异感到困惑.

两者之间有什么区别?在线阅读教程时,我看到它们都被使用了,我不确定哪种对我最好.似乎djcelery有点像芹菜,但为django量身定做?但是,当djcelery这样做时,celery不需要包含在已安装的应用程序中.

谢谢

python django celery djcelery

7
推荐指数
2
解决办法
4494
查看次数

芹菜击败队列包括过时的任务

我正在使用Django的定期芹菜任务.我以前在app/tasks.py文件中有以下任务:

@periodic_task(run_every=timedelta(minutes=2))
def stuff():
  ...
Run Code Online (Sandbox Code Playgroud)

但是现在这个任务已经从我的app/tasks.py文件中删除了.但是,我一直在芹菜日志中看到这个任务的召唤:

[2013-05-21 07:08:37,963: ERROR/MainProcess] Received unregistered task of type u'app.tasks.stuff'.
Run Code Online (Sandbox Code Playgroud)

似乎我使用的芹菜节拍调度程序不会更新其队列.这是我的project/settings.py文件中定义调度程序的方式:

CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
Run Code Online (Sandbox Code Playgroud)

重新启动芹菜工人没有帮助.仅供参考,我使用的是Redis经纪人.

如何清除或更新芹菜节拍队列,以便不会将较旧的任务发送给我的芹菜工人?

python django celery django-celery djcelery

7
推荐指数
1
解决办法
1373
查看次数

如果其中一项任务失败,芹菜链会中断

如果其中一项任务失败,那么如果整个连锁店断裂,芹菜链的重点是什么?!!

我有这个芹菜链:

res = chain(workme.s ( y=1111 ), workme2.s( 2222 ), workme3.s( 3333 ),)() 
Run Code Online (Sandbox Code Playgroud)

我使workme2失败,重试如下:

@celery.task(default_retry_delay=5, max_retries = 10, queue="sure") 
def workme2(x,y):
    # try:      
    try:
        print str(y)
        sleep(2)
        print str(x)
        ## adding any condition that makes the task fail
        if x!=None:
            raise Exception('Aproblem from your workme task')
        print 'This is my username: ' + str(x['user']) + \
               ' And Password: ' + str(x['pas'])        
        return "22xx"
    except Exception, exc:
        workme2.retry(args=[x,y], exc=exc,)
Run Code Online (Sandbox Code Playgroud)

python django celery celery-task djcelery

7
推荐指数
1
解决办法
2199
查看次数

如何解决此错误?django + celery + rabbitmq + mysql + redis中的"RestartFreqExceeded:5 in 1s"

所以我用芹菜django.rabbitmq是经纪人.redis是缓存.mysql是db.(localhost中的所有内容)

  1. 我正在使用python2.7并使用基于virtualenv的虚拟环境
  2. 我在默认端口启动redis服务器(本地)
  3. 在一个新的终端,我跑

    python manage.py runserver
    
    Run Code Online (Sandbox Code Playgroud)
  4. 在一个新的终端我开始像这样的芹菜

    celery -A ds_django worker -B -l warning
    
    Run Code Online (Sandbox Code Playgroud)

这个程序过去常常有效.但现在当我运行celery命令时,我得到以下内容

    [2016-07-12 09:15:20,113: CRITICAL/MainProcess] Frequent restarts detected: RestartFreqExceeded('5 in 1s',)
    Traceback (most recent call last):
        File "/Users/user/Desktop/ds-django/ds_django/newDs/lib/python2.7/site- packages/celery/worker/consumer.py", line 285, in start
            self._restart_state.step()
        File "/Users/user/Desktop/ds-django/ds_django/newDs/lib/python2.7/site-packages/billiard/common.py", line 130, in step
            raise self.RestartFreqExceeded("%r in %rs" % (R, self.maxT))
     RestartFreqExceeded: 5 in 1s
Run Code Online (Sandbox Code Playgroud)

我完全不知道为什么会发生这种情况或如何解决这个问题.我一遍又一遍地搜索Google和StackOverFlow,但没有用.

我认为可能发生这种情况的一个原因是,有时我曾经一次经营过4名芹菜工人,然后将他们全部关闭.也许是因为它说重启频率超标,但不确定.而且不知道如何解决这个问题.可能是一些配置文件设置,但哪个设置和哪里也没有任何线索.

如果需要任何其他信息,我会提供.任何帮助表示赞赏.谢谢.

python celery django-celery celerybeat djcelery

7
推荐指数
1
解决办法
1793
查看次数

我该如何在芹菜中实现taskset的回调

我使用celery来启动看起来像这样的任务集:

  1. 我执行一批可以并行运行的任务,这批任务的数量从几十到几千不等.
  2. 我将这些任务的结果汇总到单个答案中,然后用这个答案做一些事情---比如存储到数据库,保存到特殊结果文件等等.基本上在完成任务后我必须调用具有以下签名的函数:

    def callback(result_file_name, task_result_list): 
        #store in file
    
    
    def callback(entity_key, task_result_list):
        #store in db 
    
    Run Code Online (Sandbox Code Playgroud)

现在步骤1.在Celery队列中完成,步骤2在芹菜外部完成:

    tasks = []

    # add taksks to tasks list 

    task_group = group()
    task_group.tasks = tasks

    result = task_group.apply_async()

    res = result.join()

    # Aggregate results 

    # Save results to file, database whatever
Run Code Online (Sandbox Code Playgroud)

这种方法很麻烦,因为我必须停止一个线程,直到执行所有任务(可能需要几个小时).

我想以某种方式将步骤2转移到芹菜上 - 基本上我需要为整个任务集添加一个回调(据我所知,它在Celery中不受支持)或提交在所有这些子任务之后执行的任务.

有谁知道怎么做?我在django环境中使用它,所以我可以在数据库中存储一些状态.

总结一下我最近的发现

和弦不会

我不能直接使用和弦,因为和弦使我能够创建这样的回调:

    def callback(task_result_list): 
        #store in file
Run Code Online (Sandbox Code Playgroud)

没有明显的方法可以将其他参数传递给回调(特别是因为这些回调不能是本地函数).

使用数据库

我可以使用存储结果,TaskSetMeta但是这个实体没有状态字段---所以即使我要向TaskSetMeta添加信号,我也必须汇集任务结果,这可能会产生巨大的开销.

django celery djcelery

6
推荐指数
1
解决办法
2727
查看次数

Python Celery - 如何在其他任务中调用celery任务

我在Django-Celery的任务中调用任务

这是我的任务.

@shared_task
def post_notification(data,url):
    url = "http://posttestserver.com/data/?dir=praful" # when in production, remove this line.
    headers = {'content-type': 'application/json'}
    requests.post(url, data=json.dumps(data), headers=headers)


@shared_task
def shipment_server(data,notification_type):
    notification_obj = Notification.objects.get(name = notification_type)
    server_list = ServerNotificationMapping.objects.filter(notification_name=notification_obj)

    for server in server_list:
        task = post_notification.delay(data,server.server_id.url)
        print task.status # it prints 'Nonetype' has no attribute id
Run Code Online (Sandbox Code Playgroud)

如何在任务中调用任务? 我在某处可以看到它可以使用group,但我无法形成正确的语法.我该怎么做?

我试过这个

for server in server_list:
    task = group(post_notification.s(data, server.server_id.url))().get()
    print task.status
Run Code Online (Sandbox Code Playgroud)

发出警告说

TxIsolationWarning: Polling results w?                                                                        
ith transaction isolation level repeatable-read within the same transacti?                                                                        
on …
Run Code Online (Sandbox Code Playgroud)

python django celery djcelery

6
推荐指数
2
解决办法
9140
查看次数

python celery - how to add CELERYBEAT_SCHEDULE task at runtime to a worker?

I have created a celery worker with a single celerybeat schedule task which runs at 5 seconds time interval. How can I add another beat task dynamically to the celery worker without stopping it?

Example

app.conf.update(
   CELERY_TASK_RESULT_EXPIRES=3600,
   CELERY_TIMEZONE = 'UTC',
   CELERYBEAT_SCHEDULE = {
    'long-run-5-secs': {
        'task': 'test_proj.tasks.test',
        'schedule': timedelta(seconds=5),
        'args': (16, )
    }
   }
)
Run Code Online (Sandbox Code Playgroud)

With the above configuration, I am able to run the celery worker with beat mode successfully.

Now I need add the below beat schedule dynamically: …

python scheduler celery celerybeat djcelery

6
推荐指数
1
解决办法
3547
查看次数

Celery - “WorkerLostError:Worker 过早退出:信号 11(SIGSEGV)”

我正在本地开发一个 Django 应用程序它需要将 CSV 文件作为输入并对该文件运行一些分析。我在本地运行 Celery、RabbitMQ 和 Web 服务器。当我导入文件时,我在 Celery 服务器上看到以下错误:

[2015-12-11 16:58:53,906: WARNING/MainProcess] celery@Joes-MBP ready.
[2015-12-11 16:59:11,068: ERROR/MainProcess] Task program_manager.tasks.analyze_list_import_program[db22de16-b92f-4220-b2bd-5accf484c99a] raised unexpected: WorkerLostError('Worker exited prematurely: signal 11 (SIGSEGV).',)
Traceback (most recent call last):
File "/Users/joefusaro/rl_proto2/venv/lib/python2.7/site-packages/billiard/pool.py", line 1175, in mark_as_worker_lost
human_status(exitcode)),
WorkerLostError: Worker exited prematurely: signal 11 (SIGSEGV).
Run Code Online (Sandbox Code Playgroud)

我不知道如何进一步解决这个问题;如果有帮助,我已经从 program_manager/tasks.py 复制了相关代码:

from __future__ import absolute_import

import csv
import rollbar
from celery import shared_task
from celery.utils.log import get_task_logger

from qscore.models import QualityScore
from integrations.salesforce.prepare import read_csv
from …
Run Code Online (Sandbox Code Playgroud)

python celery celery-task django-celery djcelery

6
推荐指数
1
解决办法
5649
查看次数