我正在使用celery和rabbitmq,但是由于在队列中推送了多个任务,我的服务器内存利用率变得超过40%,因此rabbit将不再接受任何任务。所以我想删除那些已经执行的消息,但是由于rabbitmq的持久行为,这些消息不会自动删除,所以我想设置一些配置,例如 autoAck=True ,这样如果从 celery 消耗消息,它将从rabbitmq 队列以及我的服务器内存。请解释一下我们该如何做到这一点。
当我尝试启动工作程序时,出现一个问题:
ImportError:没有名为“ project”的模块
追溯(最近一次通话):
File "/usr/local/bin/celery", line 11, in <module>
sys.exit(main())
File "/usr/local/lib/python3.5/dist-packages/celery/__main__.py", line 16, in main
_main()
File "/usr/local/lib/python3.5/dist-packages/celery/bin/celery.py", line 322, in main
cmd.execute_from_commandline(argv)
File "/usr/local/lib/python3.5/dist-packages/celery/bin/celery.py", line 496, in execute_from_commandline
super(CeleryCommand, self).execute_from_commandline(argv)))
File "/usr/local/lib/python3.5/dist-packages/celery/bin/base.py", line 273, in execute_from_commandline
argv = self.setup_app_from_commandline(argv)
File "/usr/local/lib/python3.5/dist-packages/celery/bin/base.py", line 479, in setup_app_from_commandline
self.app = self.find_app(app)
File "/usr/local/lib/python3.5/dist-packages/celery/bin/base.py", line 501, in find_app
return find_app(app, symbol_by_name=self.symbol_by_name)
File "/usr/local/lib/python3.5/dist-packages/celery/app/utils.py", line 359, in find_app
sym = symbol_by_name(app, imp=imp)
File "/usr/local/lib/python3.5/dist-packages/celery/bin/base.py", line 504, in symbol_by_name
return imports.symbol_by_name(name, imp=imp)
File …Run Code Online (Sandbox Code Playgroud) 撤销 @periodic_task 发送的任务
Discarding revoked tasks & Due task to workers.
[2018-09-17 12:23:50,864: INFO/MainProcess] Received task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:23:50,864: INFO/MainProcess] Discarding revoked task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:24:00,865: INFO/Beat] Scheduler: Sending due task cimexapp.tasks.add (cimexapp.tasks.add)
[2018-09-17 12:24:00,869: INFO/MainProcess] Received task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:24:00,869: INFO/MainProcess] Discarding revoked task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:24:10,865: INFO/Beat] Scheduler: Sending due task cimexapp.tasks.add (cimexapp.tasks.add)
[2018-09-17 12:24:10,868: INFO/MainProcess] Received task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:24:10,869: INFO/MainProcess] Discarding revoked task: cimexapp.tasks.add[xxxxxxx]
任务.py
@periodic_task(run_every=timedelta(seconds=10),options={"task_id":"xxxxxxx"})
def add():
call(["ping","-c10","google.com"])
def stop():
x = revoke("xxxxxxx",terminate=True,signal="KILL")
print(x) …Run Code Online (Sandbox Code Playgroud) 我有正在运行的 Celery 3.1 应用程序,它记录了一些敏感信息。理想情况下,我希望有相同的日志,但没有结果部分。
目前它看起来像:
worker_1 | [2019-12-10 13:46:40,052: INFO/MainProcess] Task xxxxx succeeded in 13.19569299298746s: yyyyyyy
Run Code Online (Sandbox Code Playgroud)
我想拥有:
worker_1 | [2019-12-10 13:46:40,052: INFO/MainProcess] Task xxxxx succeeded in 13.19569299298746s
Run Code Online (Sandbox Code Playgroud)
怎么做?
编辑:看来这可以完成这项工作:https://docs.celeryproject.org/en/3.1/reference/celery.worker.job.html#celery.worker.job.Request.success_msg但我不知道如何来实际使用它。
我正在尝试将运行 Celery 的 Django 应用程序从运行 Python 3.6 的 Amazon Elastic Beanstalk Linux 1 升级到使用 Python 3.8 的 Amazon Linux 2。
我在使用 Celery 应用程序时遇到问题。
在Linux 1中我有以下文件
#!/usr/bin/env bash
# Get django environment variables
celeryenv=`cat /opt/python/current/env | tr '\n' ',' | sed 's/export //g' | sed 's/$PATH/%(ENV_PATH)s/g' | sed 's/$PYTHONPATH//g' | sed 's/$LD_LIBRARY_PATH//g'`
celeryenv=${celeryenv%?}
# Create celery configuraiton script
celeryconf="[program:celeryd-worker]
; Set full path to celery program if using virtualenv
command=/opt/python/run/venv/bin/celery -A core worker -P solo --loglevel=INFO -n worker.%%h
directory=/opt/python/current/app/src
user=nobody
numprocs=1 …Run Code Online (Sandbox Code Playgroud) 我对 Django Celery 有以下任务(使用 Amazon SQS)。
@task
def upload_task(request, **kwargs):
file = request.FILES['file']
ContactCSVModel.import_from_file(file)
return True
Run Code Online (Sandbox Code Playgroud)
这似乎有效,即文件已成功添加到我的数据库中,但我不确定它是否使用 Celery。我怎么知道这是否有效。我应该看点什么吗?在终端我跑了...
manage.py celery worker --loglevel=info,它指出我有一个名为 的任务contacts.tasks.upload_task,但我从未看到任何事情发生,它只是指出...
[2013-03-14 20:52:47,947:INFO/MainProcess]消费者:连接到 sqs://AJSUQJZKNSJA81JM@localhost//
知道我的任务是否已运行并完成(是的,我知道它已完成,因为它在数据库中,但是通过 Celery 实现的吗?)
这是任务运行的地方
视图.py
@login_required
def upload(request):
# If we had a POST then get the request post values.
if request.method == 'POST':
form = ContactUploadForm(request.POST, request.FILES)
# Check we have valid data
if form.is_valid():
upload_task(request)
#file = request.FILES['file']
#ContactCSVModel.import_from_file(file)
messages.add_message(request, messages.SUCCESS, 'Items have been added')
else:
messages.add_message(request, messages.ERROR, ' …Run Code Online (Sandbox Code Playgroud) 将芹菜文档描述了如何通过位置参数,以您的节拍计划任务列表或元组.
我有一个任务,只需一个参数,一个整数列表:
@shared_task
def schedule_by_ids(ids):
...
Run Code Online (Sandbox Code Playgroud)
我的celerybeat时间表如下:
CELERYBEAT_SCHEDULE = {
'schedule_by_ids': {
'task': 'myproj.app.tasks.schedule_by_ids',
'schedule': crontab(minute='*/10', hour='8-21'),
'args': ([1,]),
},
}
Run Code Online (Sandbox Code Playgroud)
我的任务失败,"int不可迭代" TypeError.根据我的显示器(芹菜花),args传递为[1].
当我将args作为列表时,例如[[1]],arg显示在监视器中,[[1]]并且它工作正常.
我的问题是:当它是一个元组时,它是如何通过args的?为什么?
我正在建立一个云系统,我有两个应用程序,包括完整功能的服务器应用程序,以及仅包含输入法的客户端应用程序,所以我在客户分支中安装客户端应用程序作为本地应用程序,
我希望在本地保存模型后覆盖应用程序中的任何模型,我将调用芹菜任务将此模型添加到队列中以确保它将到达,即使互联网已关闭,我将重试直到互联网起步,
现在我希望最佳实践能够以通用的方式对任何模型进行操作
我有两个选择
1-这样的覆盖保存方法
def save(self, *args, **kwargs):
super(Model, self).save(*args, **kwargs)
save_task.delay(self)
Run Code Online (Sandbox Code Playgroud)
或使用这样的信号
post_save.connect(save-task.delay, sender=Model)
Run Code Online (Sandbox Code Playgroud)
哪一个是最佳实践,我可以为这个项目的所有模型制作泛型?
对于 Django 项目来说,创建基于函数的任务非常干净。只需在 django 应用程序中创建tasks.py并开始编写任务,就像这个示例一样,该示例取自官方celery文档:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
Run Code Online (Sandbox Code Playgroud)
但有时基于函数的任务是紧密耦合的并且可重用性不高。所以我想创建基于类的芹菜任务,该任务记录在官方网站中。在遵循https://github.com/celery/celery/issues/3874之后,我可以创建示例任务,但我不确定它是否是创建基于类的任务的正确方法。
from __future__ import absolute_import, unicode_literals
from celery import shared_task, Task
import time
from celery import current_app
@shared_task
def add(x, y):
time.sleep(5)
return x + y
@shared_task
def mul(x, y):
return x * y
# Sample class based task for testing
class AnotherTask(current_app.Task):
name = 'tasks.another_task'
def …Run Code Online (Sandbox Code Playgroud) 我想创建一个第三方聊天机器人 API,它是异步的,并在 10 秒暂停后回复“ok”。
import time
def wait():
time.sleep(10)
return "ok"
# views.py
def api(request):
return wait()
Run Code Online (Sandbox Code Playgroud)
我已经尝试了 celery,如下所示,我正在等待 celery 响应:
import time
from celery import shared_task
@shared_task
def wait():
time.sleep(10)
return "ok"
# views.py
def api(request):
a = wait.delay()
work = AsyncResult(a.id)
while True:
if work.ready():
return work.get(timeout=1)
Run Code Online (Sandbox Code Playgroud)
但是这个解决方案是同步工作的,没有区别。我们如何在不要求用户继续请求直到收到结果的情况下使其异步?
在我当前的项目中,我需要做的是从 700 多个端点获取数据,然后将该数据发送到另外 700 多个端点。我的方法是使用 Django Celery Redis,在每个工作人员上放置 70 多个端点,以便有大约 10 个工作人员将检索数据然后发布数据。
为此,我使用 Chord 执行并行任务,然后计算所需的时间。
问题是 Celery 多次运行相同的任务。task_get_data是主要方法,首先获取网站列表,然后将其分成每组 70 个,然后task_post_data使用 Chord 调用。
在下面的输出中website_A,您可以website_B多次看到 等,我已经手动检查了我的数据和所有内容,并且没有重复的网站,但是当提交 celery 任务时,会创建多个条目。
另外,有没有办法监控工人的数量以及他们正在处理什么?
下面是代码
os.environ.setdefault('DJANGO_SETTINGS_MODULE','django_backend.settings')
app = Celery('django_backend', backend='redis://localhost:6379', broker='redis://localhost:6379')
app.config_from_object('django.conf:settings', namespace='CELERY')
# app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
def post_data(json_obj, website):
for items in json_obj:
md = md + items['data']
n = 50
list_of_objects = [md[i:i+n] for i in range(0, len(md), n)]
print("Starting to post …Run Code Online (Sandbox Code Playgroud) django-celery ×11
celery ×9
django ×8
python ×6
celery-task ×3
celerybeat ×2
asynchronous ×1
linux ×1
rabbitmq ×1
task ×1