我在本地主机上使用 Celery + Redis 和 Django Rest API 来运行分类任务,如何从 Axios 帖子获取数据。现在我正在尝试将其部署到谷歌云,并且我没有找到在App Engine上运行Redis和Celery的明确方法,所以我听说了Google任务队列,但我没有找到将其添加到视图和触发器的方法当视图被调用时,我如何创建一个函数来调用我在 celery 上的这个谷歌云任务,或者只是知道如何做到这一点,这些是我的代码:
from celery import shared_task
from celery_progress.backend import ProgressRecorder
from snakeimage.models import Prediction,UploadedSnake,SnakeClass
from snakeimage.classification_codes.classification_codes.prediction_func import predict_classes
#import json
#import time
#from django.conf import settings
#from google.cloud import tasks_v2beta3
#from google.protobuf import timestamp_pb2
@shared_task(bind=True)
def image_progress(self,image_path, X, Y, metadata,image_id):
progress_recorder = ProgressRecorder(self,)
predictions = predict_classes(image_path, X, Y, metadata)
print(predictions)
for prediction in predictions:
print(prediction[0])
image = UploadedSnake.objects.get(id=image_id)
class_name = SnakeClass.objects.get(index=(prediction[0]+1))
print('>>>>>>>>>>>>>>>>>>>>>',prediction[1])
Prediction.objects.create(image=image,class_name=class_name,predict_percent=prediction[1])
progress_recorder.set_progress( 1, 3, description='Prediction Result …Run Code Online (Sandbox Code Playgroud) google-app-engine django-views celery django-celery google-cloud-tasks
我有使用 Celery 版本 4.4.2 的 Django 应用程序,它运行良好。
from celery import task
import logging
@task(ignore_result=True)
def log_user_activity(user_id):
try:
logging.info(user_id)
except Exception as e:
logging.error(str(e))
Run Code Online (Sandbox Code Playgroud)
当我尝试将 Celery 版本更新到 v5.2.2 时,出现以下错误:
ImportError:无法从“celery”导入名称“task”
有人可以帮忙任务被替换成什么吗?他们这里还有同样的例子。 https://github.com/celery/celery/blob/v5.2.2/examples/celery_http_gateway/tasks.py
我有这个错误A rediss:// URL must have parameter ssl_cert_reqs and this must be set to CERT_REQUIRED, CERT_OPTIONAL, or CERT_NONE
CELERY_BROKER_URL = os.environ.get('REDIS_URL', "redis://localhost:6379")
CELERY_RESULT_BACKEND = os.environ.get('REDIS_URL', "redis://localhost:6379")
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
Run Code Online (Sandbox Code Playgroud)
from __future__ import absolute_import, unicode_literals
from celery import Celery
import os, ssl
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproj.settings')
app = Celery(
'myproj',
broker_use_ssl = {
'ssl_cert_reqs': ssl.CERT_NONE
})
app.config_from_object('django.conf:settings', namespace="CELERY")
app.autodiscover_tasks()
Run Code Online (Sandbox Code Playgroud)
我将 ssl_cert_reqs 的值更改为不同的值“none”,“cert_required”,...,但什么也没有,当我使用 rediss:// 而不是 redis:// 时总是出现相同的错误。
我目前正在设置一个系统,它使用带有redis后端的芹菜来执行一系列异步任务,例如发送电子邮件,提取社交数据,抓取等等.一切都很好,但我正在寻找如何监控系统(也就是排队消息的数量).我开始浏览芹菜来源,但我想我会在这里发布我的问题:首先,这是我的配置:
BROKER_BACKEND = "redis"
BROKER_HOST = "localhost"
BROKER_PORT = 6379
BROKER_VHOST = "1"
REDIS_CONNECT_RETRY = True
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_DB = "0"
CELERY_SEND_EVENTS = True
CELERYD_LOG_LEVEL = 'INFO'
CELERY_RESULT_BACKEND = "redis"
CELERY_TASK_RESULT_EXPIRES = 25
CELERYD_CONCURRENCY = 8
CELERYD_MAX_TASKS_PER_CHILD = 10
CELERY_ALWAYS_EAGER =True
Run Code Online (Sandbox Code Playgroud)
我要做的第一件事是监视队列中有多少消息.我假设,在幕后,redis后端只是从列表中推送/弹出,虽然我似乎无法在代码中找到它.所以我模拟了一个模拟,我开始大约100个任务,并试图在redis中找到它们:我的celeryd运行如下:python manage.py celeryd -c 4 --loglevel = DEBUG -n XXXXX --logfile = logs/celery.log所以我应该一次只有4个并发工作者......我不明白的两件事:问题1:排队完成100个任务后,在redis上查找它们,我只看到以下内容:
$ redis-cli
redis 127.0.0.1:6379> keys *
1) "_kombu.binding.celery"
redis 127.0.0.1:6379> select 1
OK
redis 127.0.0.1:6379[1]> keys *
1) "_kombu.binding.celery"
2) "_kombu.binding.celeryd.pidbox"
redis …Run Code Online (Sandbox Code Playgroud) 我正在开发环境中运行,所以这在生产中可能有所不同,但是当我从Django Celery运行任务时,它似乎每隔10-20秒才从代理中获取任务.我只是在这一点上进行测试但是假设我发送大约1000个任务,这意味着它需要花费5个多小时才能完成.
这是正常的吗?应该更快吗?或者我做错了什么?
这是我的任务
class SendMessage(Task):
name = "Sending SMS"
max_retries = 10
default_retry_delay = 3
def run(self, message_id, gateway_id=None, **kwargs):
logging.debug("About to send a message.")
# Because we don't always have control over transactions
# in our calling code, we will retry up to 10 times, every 3
# seconds, in order to try to allow for the commit to the database
# to finish. That gives the server 30 seconds to write all of
# the data to …Run Code Online (Sandbox Code Playgroud) 我正在使用Celery,Django和RabbitMQ。我正在寻找一种自动从RabbitMQ的celery队列中清除旧邮件的方法。因此,无论何时默认情况下创建队列,都应清除所有早于x秒的消息。
这些是我在settings.py中设置的变量。
CELERY_DEFAULT_EXCHANGE = 'celery'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'celery'
CELERY_DEFAULT_QUEUE = 'celery'
CELERY_EVENT_QUEUE_TTL= 30
Run Code Online (Sandbox Code Playgroud)
我也尝试过这样做,但是导致没有消息发送到RabbitMQ。
CELERY_QUEUES = (
Queue('celery', routing_key='celery', queue_arguments={'x-message-ttl': 30}),
Queue('default', routing_key='default', queue_arguments={'x-message-ttl': 30}),
)
Run Code Online (Sandbox Code Playgroud)
任何想法都欢迎。
我是第一次将远程工作人员连接到我的Celery服务器(Django).在我的服务器上,我为用户创建了一个新的用户名和密码,并设置了权限:
# rabbitmqctl add_user adcelery pwd
# rabbitmqctl set_permissions adcelery "^adcelery-.*" ".*" ".*"
# rabbitmqctl list_users
Listing users ...
guest [administrator]
adcelery []
...done.
# /etc/init.d/rabbitmq-server restart
# /etc/init.d/celeryd restart
Run Code Online (Sandbox Code Playgroud)
我的远程工作者的URL:
BROKER_URL = "amqp://adcelery:pwd@mydomain.com/"
Run Code Online (Sandbox Code Playgroud)
我在远程工作者的启动时收到以下错误.当我在BROKER_URL上面设置"guest:guest"作为我的登录时,它连接完全正常.我确定我错过了一两步,有什么建议吗?
[2014-01-12 11:31:26,188: INFO/MainProcess] Connected to amqp://adcelery@awaaz.de:5672//
[2014-01-12 11:31:26,391: ERROR/MainProcess] Unrecoverable error: AccessRefused(403, u"ACCESS_REFUSED - access to exchange 'celeryev' in vhost '/' refused f
or user 'adcelery'", (40, 10), 'Exchange.declare')
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/celery/worker/__init__.py", line 206, in start
self.blueprint.start(self)
File "/usr/local/lib/python2.7/dist-packages/celery/bootsteps.py", …Run Code Online (Sandbox Code Playgroud) 我想使用.delay来实现异步行为.使用它的主要原因是为了加快我的观点.我做错了吗?如果是这样,我该怎么做呢?
以下是示例代码:
View.py
@cache_page(60*60*24)
def my_view(request):
something ..... .... ....
a = SomeModel.objects.get(pk=id)
data = celery_task.delay(a)
return dumpjson(status='ok', data=data, callback=callback)
Run Code Online (Sandbox Code Playgroud)
Task.py
def celery_task(a):
res = request.get('http:sample.sample.com/feed/result' params={'abc': 'abc'})
return {'response': res}
Run Code Online (Sandbox Code Playgroud)
如果我从celery_task带来响应,它会显示一些guid(1b52f519-64cb-43da-844a-2886bcccb9bc),错误是这样的:
<EagerResult: 1b52f519-64cb-43da-844a-2886bcccb9bc> is not JSON serializable
Run Code Online (Sandbox Code Playgroud) 我必须产生芹菜任务,它必须有一些命名空间(例如用户ID).所以我产生了它
scrapper_start.apply_async((request.user.id,), queue=account.Account_username)
app.control.add_consumer(account.Account_username, reply=True)
Run Code Online (Sandbox Code Playgroud)
并且任务从其他任务递归地产生.现在我必须检查队列的任务是否正在执行.试图检查redis中的列表长度,它在celery开始执行之前返回真实数字.如何解决这个问题呢.我只需要检查,如果队列或消费者正在执行或已经为空.谢谢
我正在使用django + celery进行定期任务.我的项目有django-celery == 3.1.17,使用的代理是Redis,它正在工作.
在我的设置文件中:
CELERYBEAT_SCHEDULE={
'delivery_send': {
'task': 'delivery.tasks.DeliverySendTask',
'schedule': timedelta(minutes=1),
'args': [],
},
}
CELERY_ROUTES = {
'delivery.tasks.DeliverySendTask': {
'queue': 'periodic_tasks',
'routing_key': 'periodic_tasks'
},
}
Run Code Online (Sandbox Code Playgroud)
有更多的芹菜设置,但我想这两个与这个问题最相关.
任务:
class DeliverySendTask(Task):
def run(self, *args, **kwargs):
logger.info('executing task!')
from .models import Dispatch
Dispatch.objects.all().delete()
Run Code Online (Sandbox Code Playgroud)
然后我跑到python manage.py celery beat控制台看
[2016-06-23 14:32:01,230: INFO/MainProcess] Scheduler: Sending due task delivery_send (delivery.tasks.DeliverySendTask)
Run Code Online (Sandbox Code Playgroud)
但删除从未实际执行过.我错过了什么?
谢谢你的帮助.
django-celery ×10
django ×7
celery ×6
python ×5
rabbitmq ×2
redis ×2
django-views ×1
heroku ×1