我正在尝试设置djangocelery来存储数据库中的任务结果.
我设置:
CELERY_RESULT_BACKEND = 'djcelery.backends.database.DatabaseBackend'
Run Code Online (Sandbox Code Playgroud)
然后我同步并迁移了db(没有错误).
Celery正在工作,任务得到处理(我可以得到结果),但是管理员显示没有任务.在数据库中有两个表celery_taskmeta
和djcelery_taskmeta
.第一个是保持结果,第二个显示在管理员中.任何人都有洞察力如何正确配置它?
我收到以下错误:
File "/Library/Python/2.7/site-packages/Django-1.8.2-py2.7.egg/django/utils/translation/trans_real.py", line 164, in _add_installed_apps_translations
"The translation infrastructure cannot be initialized before the "
django.core.exceptions.AppRegistryNotReady: The translation infrastructure cannot be initialized before the apps registry is ready. Check that you don't make non-lazy gettext calls at import time.
Run Code Online (Sandbox Code Playgroud)
我有一个项目,它不是一个django应用程序,而是一个芹菜应用程序.因此,我还没有创建一个wsgi.py
或models.py
或通过创建典型的文件django-admin
时,一个项目或应用程序已启动.
我只想使用djcelery
能够使用djcelery.schedules.DatabaseScheduler
此处指定的类似方法创建周期性任务在运行时添加,修改,删除celery.schedules和此处如何动态添加/删除周期性任务到Celery(celerybeat)
这里给出的问题的解决方案(AppRegistryNotReady,使用uWSGI部署时的转换错误)要求我对vassal.ini文件进行更改.我的实现中没有vassal.ini文件.
我将简要介绍一下我的项目 -
proj
apps.py
tasks.py
celeryconfig.py
runproj.py
Run Code Online (Sandbox Code Playgroud)
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryconfig')
myapp = Celery('myapp')
myapp.config_from_object('celeryconfig')
if __name__ == '__main__' …
Run Code Online (Sandbox Code Playgroud) 在尝试在我的django项目上设置芹菜时,我对这两个应用程序之间的差异感到困惑.
两者之间有什么区别?在线阅读教程时,我看到它们都被使用了,我不确定哪种对我最好.似乎djcelery有点像芹菜,但为django量身定做?但是,当djcelery这样做时,celery不需要包含在已安装的应用程序中.
谢谢
我正在使用Django的定期芹菜任务.我以前在app/tasks.py文件中有以下任务:
@periodic_task(run_every=timedelta(minutes=2))
def stuff():
...
Run Code Online (Sandbox Code Playgroud)
但是现在这个任务已经从我的app/tasks.py文件中删除了.但是,我一直在芹菜日志中看到这个任务的召唤:
[2013-05-21 07:08:37,963: ERROR/MainProcess] Received unregistered task of type u'app.tasks.stuff'.
Run Code Online (Sandbox Code Playgroud)
似乎我使用的芹菜节拍调度程序不会更新其队列.这是我的project/settings.py文件中定义调度程序的方式:
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
Run Code Online (Sandbox Code Playgroud)
重新启动芹菜工人没有帮助.仅供参考,我使用的是Redis经纪人.
如何清除或更新芹菜节拍队列,以便不会将较旧的任务发送给我的芹菜工人?
如果其中一项任务失败,那么如果整个连锁店断裂,芹菜链的重点是什么?!!
我有这个芹菜链:
res = chain(workme.s ( y=1111 ), workme2.s( 2222 ), workme3.s( 3333 ),)()
Run Code Online (Sandbox Code Playgroud)
我使workme2失败,重试如下:
@celery.task(default_retry_delay=5, max_retries = 10, queue="sure")
def workme2(x,y):
# try:
try:
print str(y)
sleep(2)
print str(x)
## adding any condition that makes the task fail
if x!=None:
raise Exception('Aproblem from your workme task')
print 'This is my username: ' + str(x['user']) + \
' And Password: ' + str(x['pas'])
return "22xx"
except Exception, exc:
workme2.retry(args=[x,y], exc=exc,)
Run Code Online (Sandbox Code Playgroud) 所以我用芹菜django.rabbitmq是经纪人.redis是缓存.mysql是db.(localhost中的所有内容)
在一个新的终端,我跑
python manage.py runserver
Run Code Online (Sandbox Code Playgroud)在一个新的终端我开始像这样的芹菜
celery -A ds_django worker -B -l warning
Run Code Online (Sandbox Code Playgroud)这个程序过去常常有效.但现在当我运行celery命令时,我得到以下内容
[2016-07-12 09:15:20,113: CRITICAL/MainProcess] Frequent restarts detected: RestartFreqExceeded('5 in 1s',)
Traceback (most recent call last):
File "/Users/user/Desktop/ds-django/ds_django/newDs/lib/python2.7/site- packages/celery/worker/consumer.py", line 285, in start
self._restart_state.step()
File "/Users/user/Desktop/ds-django/ds_django/newDs/lib/python2.7/site-packages/billiard/common.py", line 130, in step
raise self.RestartFreqExceeded("%r in %rs" % (R, self.maxT))
RestartFreqExceeded: 5 in 1s
Run Code Online (Sandbox Code Playgroud)
我完全不知道为什么会发生这种情况或如何解决这个问题.我一遍又一遍地搜索Google和StackOverFlow,但没有用.
我认为可能发生这种情况的一个原因是,有时我曾经一次经营过4名芹菜工人,然后将他们全部关闭.也许是因为它说重启频率超标,但不确定.而且不知道如何解决这个问题.可能是一些配置文件设置,但哪个设置和哪里也没有任何线索.
如果需要任何其他信息,我会提供.任何帮助表示赞赏.谢谢.
我使用celery来启动看起来像这样的任务集:
我将这些任务的结果汇总到单个答案中,然后用这个答案做一些事情---比如存储到数据库,保存到特殊结果文件等等.基本上在完成任务后我必须调用具有以下签名的函数:
def callback(result_file_name, task_result_list):
#store in file
def callback(entity_key, task_result_list):
#store in db
Run Code Online (Sandbox Code Playgroud)现在步骤1.在Celery队列中完成,步骤2在芹菜外部完成:
tasks = []
# add taksks to tasks list
task_group = group()
task_group.tasks = tasks
result = task_group.apply_async()
res = result.join()
# Aggregate results
# Save results to file, database whatever
Run Code Online (Sandbox Code Playgroud)
这种方法很麻烦,因为我必须停止一个线程,直到执行所有任务(可能需要几个小时).
我想以某种方式将步骤2转移到芹菜上 - 基本上我需要为整个任务集添加一个回调(据我所知,它在Celery中不受支持)或提交在所有这些子任务之后执行的任务.
有谁知道怎么做?我在django环境中使用它,所以我可以在数据库中存储一些状态.
我不能直接使用和弦,因为和弦使我能够创建这样的回调:
def callback(task_result_list):
#store in file
Run Code Online (Sandbox Code Playgroud)
没有明显的方法可以将其他参数传递给回调(特别是因为这些回调不能是本地函数).
我可以使用存储结果,TaskSetMeta
但是这个实体没有状态字段---所以即使我要向TaskSetMeta添加信号,我也必须汇集任务结果,这可能会产生巨大的开销.
我在Django-Celery的任务中调用任务
这是我的任务.
@shared_task
def post_notification(data,url):
url = "http://posttestserver.com/data/?dir=praful" # when in production, remove this line.
headers = {'content-type': 'application/json'}
requests.post(url, data=json.dumps(data), headers=headers)
@shared_task
def shipment_server(data,notification_type):
notification_obj = Notification.objects.get(name = notification_type)
server_list = ServerNotificationMapping.objects.filter(notification_name=notification_obj)
for server in server_list:
task = post_notification.delay(data,server.server_id.url)
print task.status # it prints 'Nonetype' has no attribute id
Run Code Online (Sandbox Code Playgroud)
如何在任务中调用任务?
我在某处可以看到它可以使用group
,但我无法形成正确的语法.我该怎么做?
我试过这个
for server in server_list:
task = group(post_notification.s(data, server.server_id.url))().get()
print task.status
Run Code Online (Sandbox Code Playgroud)
发出警告说
TxIsolationWarning: Polling results w?
ith transaction isolation level repeatable-read within the same transacti?
on …
Run Code Online (Sandbox Code Playgroud) I have created a celery worker with a single celerybeat schedule task which runs at 5 seconds time interval. How can I add another beat task dynamically to the celery worker without stopping it?
Example
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
CELERY_TIMEZONE = 'UTC',
CELERYBEAT_SCHEDULE = {
'long-run-5-secs': {
'task': 'test_proj.tasks.test',
'schedule': timedelta(seconds=5),
'args': (16, )
}
}
)
Run Code Online (Sandbox Code Playgroud)
With the above configuration, I am able to run the celery worker with beat mode successfully.
Now I need add the below beat schedule dynamically: …
我正在本地开发一个 Django 应用程序,它需要将 CSV 文件作为输入并对该文件运行一些分析。我在本地运行 Celery、RabbitMQ 和 Web 服务器。当我导入文件时,我在 Celery 服务器上看到以下错误:
[2015-12-11 16:58:53,906: WARNING/MainProcess] celery@Joes-MBP ready.
[2015-12-11 16:59:11,068: ERROR/MainProcess] Task program_manager.tasks.analyze_list_import_program[db22de16-b92f-4220-b2bd-5accf484c99a] raised unexpected: WorkerLostError('Worker exited prematurely: signal 11 (SIGSEGV).',)
Traceback (most recent call last):
File "/Users/joefusaro/rl_proto2/venv/lib/python2.7/site-packages/billiard/pool.py", line 1175, in mark_as_worker_lost
human_status(exitcode)),
WorkerLostError: Worker exited prematurely: signal 11 (SIGSEGV).
Run Code Online (Sandbox Code Playgroud)
我不知道如何进一步解决这个问题;如果有帮助,我已经从 program_manager/tasks.py 复制了相关代码:
from __future__ import absolute_import
import csv
import rollbar
from celery import shared_task
from celery.utils.log import get_task_logger
from qscore.models import QualityScore
from integrations.salesforce.prepare import read_csv
from …
Run Code Online (Sandbox Code Playgroud)