标签: django-celery

我如何将 Celery 任务转换为 Google Cloud 任务 GCP (Django)

我在本地主机上使用 Celery + Redis 和 Django Rest API 来运行分类任务,如何从 Axios 帖子获取数据。现在我正在尝试将其部署到谷歌云,并且我没有找到在App Engine上运行Redis和Celery的明确方法,所以我听说了Google任务队列,但我没有找到将其添加到视图和触发器的方法当视图被调用时,我如何创建一个函数来调用我在 celery 上的这个谷歌云任务,或者只是知道如何做到这一点,这些是我的代码:

from celery import shared_task
from celery_progress.backend import ProgressRecorder

from snakeimage.models import Prediction,UploadedSnake,SnakeClass
from snakeimage.classification_codes.classification_codes.prediction_func import predict_classes

#import json
#import time

#from django.conf import settings

#from google.cloud import tasks_v2beta3
#from google.protobuf import timestamp_pb2

@shared_task(bind=True)
 def image_progress(self,image_path, X, Y, metadata,image_id):
 progress_recorder = ProgressRecorder(self,)
 predictions = predict_classes(image_path, X, Y, metadata)
 print(predictions)
   for prediction in predictions:
      print(prediction[0])
      image = UploadedSnake.objects.get(id=image_id)
      class_name = SnakeClass.objects.get(index=(prediction[0]+1))
      print('>>>>>>>>>>>>>>>>>>>>>',prediction[1])
     
 Prediction.objects.create(image=image,class_name=class_name,predict_percent=prediction[1])
 progress_recorder.set_progress( 1, 3, description='Prediction Result …
Run Code Online (Sandbox Code Playgroud)

google-app-engine django-views celery django-celery google-cloud-tasks

3
推荐指数
1
解决办法
4223
查看次数

Django,导入错误:无法从“celery”导入名称“任务”

我有使用 Celery 版本 4.4.2 的 Django 应用程序,它运行良好。

from celery import task
import logging


@task(ignore_result=True)
def log_user_activity(user_id):
    try:
        logging.info(user_id)
    except Exception as e:
        logging.error(str(e))
Run Code Online (Sandbox Code Playgroud)

当我尝试将 Celery 版本更新到 v5.2.2 时,出现以下错误:

ImportError:无法从“celery”导入名称“task”

有人可以帮忙任务被替换成什么吗?他们这里还有同样的例子。 https://github.com/celery/celery/blob/v5.2.2/examples/celery_http_gateway/tasks.py

python django django-celery

3
推荐指数
1
解决办法
7477
查看次数

Django Celery Redis

我有这个错误A rediss:// URL must have parameter ssl_cert_reqs and this must be set to CERT_REQUIRED, CERT_OPTIONAL, or CERT_NONE

设置

CELERY_BROKER_URL = os.environ.get('REDIS_URL', "redis://localhost:6379")
CELERY_RESULT_BACKEND = os.environ.get('REDIS_URL', "redis://localhost:6379")
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
Run Code Online (Sandbox Code Playgroud)

芹菜.py

from __future__ import absolute_import, unicode_literals
from celery import Celery
import os, ssl

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproj.settings')

app = Celery(
'myproj',
broker_use_ssl = {
    'ssl_cert_reqs': ssl.CERT_NONE
})

app.config_from_object('django.conf:settings', namespace="CELERY")

app.autodiscover_tasks()
Run Code Online (Sandbox Code Playgroud)

我将 ssl_cert_reqs 的值更改为不同的值“none”,“cert_required”,...,但什么也没有,当我使用 rediss:// 而不是 redis:// 时总是出现相同的错误。

django heroku redis django-celery

3
推荐指数
1
解决办法
2407
查看次数

芹菜和Redis后端的问题

我目前正在设置一个系统,它使用带有redis后端的芹菜来执行一系列异步任务,例如发送电子邮件,提取社交数据,抓取等等.一切都很好,但我正在寻找如何监控系统(也就是排队消息的数量).我开始浏览芹菜来源,但我想我会在这里发布我的问题:首先,这是我的配置:

BROKER_BACKEND                  = "redis" 
BROKER_HOST                     = "localhost" 
BROKER_PORT                     = 6379 
BROKER_VHOST                    = "1" 
REDIS_CONNECT_RETRY     = True 
REDIS_HOST                              = "localhost" 
REDIS_PORT                              = 6379 
REDIS_DB                                = "0" 
CELERY_SEND_EVENTS                      = True 
CELERYD_LOG_LEVEL               = 'INFO' 
CELERY_RESULT_BACKEND           = "redis" 
CELERY_TASK_RESULT_EXPIRES      = 25 
CELERYD_CONCURRENCY             = 8 
CELERYD_MAX_TASKS_PER_CHILD = 10 
CELERY_ALWAYS_EAGER                     =True
Run Code Online (Sandbox Code Playgroud)

我要做的第一件事是监视队列中有多少消息.我假设,在幕后,redis后端只是从列表中推送/弹出,虽然我似乎无法在代码中找到它.所以我模拟了一个模拟,我开始大约100个任务,并试图在redis中找到它们:我的celeryd运行如下:python manage.py celeryd -c 4 --loglevel = DEBUG -n XXXXX --logfile = logs/celery.log所以我应该一次只有4个并发工作者......我不明白的两件事:问题1:排队完成100个任务后,在redis上查找它们,我只看到以下内容:

$ redis-cli 
redis 127.0.0.1:6379> keys * 
1) "_kombu.binding.celery" 
redis 127.0.0.1:6379> select 1 
OK 
redis 127.0.0.1:6379[1]> keys * 
1) "_kombu.binding.celery" 
2) "_kombu.binding.celeryd.pidbox" 
redis …
Run Code Online (Sandbox Code Playgroud)

python django redis celery django-celery

2
推荐指数
1
解决办法
5871
查看次数

Django芹菜减少时间,5小时完成1000个任务

我正在开发环境中运行,所以这在生产中可能有所不同,但是当我从Django Celery运行任务时,它似乎每隔10-20秒才从代理中获取任务.我只是在这一点上进行测试但是假设我发送大约1000个任务,这意味着它需要花费5个多小时才能完成.

这是正常的吗?应该更快吗?或者我做错了什么?

这是我的任务

class SendMessage(Task):
    name = "Sending SMS"
    max_retries = 10
    default_retry_delay = 3

    def run(self, message_id, gateway_id=None, **kwargs):
        logging.debug("About to send a message.")

        # Because we don't always have control over transactions
        # in our calling code, we will retry up to 10 times, every 3
        # seconds, in order to try to allow for the commit to the database
        # to finish. That gives the server 30 seconds to write all of
        # the data to …
Run Code Online (Sandbox Code Playgroud)

django django-celery

2
推荐指数
1
解决办法
285
查看次数

如何使用TTL设置Celery,以清除队列中的旧消息?

我正在使用Celery,Django和RabbitMQ。我正在寻找一种自动从RabbitMQ的celery队列中清除旧邮件的方法。因此,无论何时默认情况下创建队列,都应清除所有早于x秒的消息。

这些是我在settings.py中设置的变量。

CELERY_DEFAULT_EXCHANGE = 'celery'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'celery'
CELERY_DEFAULT_QUEUE = 'celery'
CELERY_EVENT_QUEUE_TTL= 30
Run Code Online (Sandbox Code Playgroud)

我也尝试过这样做,但是导致没有消息发送到RabbitMQ。

CELERY_QUEUES = (                                                                                                                                                                 
Queue('celery', routing_key='celery', queue_arguments={'x-message-ttl': 30}),                                                                                                 
Queue('default', routing_key='default', queue_arguments={'x-message-ttl': 30}),                                                                                               
)
Run Code Online (Sandbox Code Playgroud)

任何想法都欢迎。

django rabbitmq celery django-celery

2
推荐指数
1
解决办法
1856
查看次数

Celery:无法使用新用户名连接远程工作人员

我是第一次将远程工作人员连接到我的Celery服务器(Django).在我的服务器上,我为用户创建了一个新的用户名和密码,并设置了权限:

# rabbitmqctl add_user adcelery pwd
# rabbitmqctl set_permissions adcelery "^adcelery-.*" ".*" ".*"
# rabbitmqctl list_users
Listing users ...
guest   [administrator]
adcelery    []
...done.
# /etc/init.d/rabbitmq-server restart
# /etc/init.d/celeryd restart
Run Code Online (Sandbox Code Playgroud)

我的远程工作者的URL:

BROKER_URL = "amqp://adcelery:pwd@mydomain.com/"
Run Code Online (Sandbox Code Playgroud)

我在远程工作者的启动时收到以下错误.当我在BROKER_URL上面设置"guest:guest"作为我的登录时,它连接完全正常.我确定我错过了一两步,有什么建议吗?

[2014-01-12 11:31:26,188: INFO/MainProcess] Connected to amqp://adcelery@awaaz.de:5672//
[2014-01-12 11:31:26,391: ERROR/MainProcess] Unrecoverable error: AccessRefused(403, u"ACCESS_REFUSED - access to exchange 'celeryev' in vhost '/' refused f
or user 'adcelery'", (40, 10), 'Exchange.declare')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "/usr/local/lib/python2.7/dist-packages/celery/bootsteps.py", …
Run Code Online (Sandbox Code Playgroud)

rabbitmq celery django-celery

2
推荐指数
1
解决办法
2202
查看次数

如何在django-celery中使用.delay()方法?

我想使用.delay来实现异步行为.使用它的主要原因是为了加快我的观点.我做错了吗?如果是这样,我该怎么做呢?

以下是示例代码:

View.py

@cache_page(60*60*24)
def my_view(request):
    something ..... .... ....
    a = SomeModel.objects.get(pk=id)
    data = celery_task.delay(a)
    return dumpjson(status='ok', data=data, callback=callback)
Run Code Online (Sandbox Code Playgroud)

Task.py

def celery_task(a):
    res = request.get('http:sample.sample.com/feed/result' params={'abc': 'abc'})
    return {'response': res}
Run Code Online (Sandbox Code Playgroud)

如果我从celery_task带来响应,它会显示一些guid(1b52f519-64cb-43da-844a-2886bcccb9bc),错误是这样的:

<EagerResult: 1b52f519-64cb-43da-844a-2886bcccb9bc> is not JSON serializable
Run Code Online (Sandbox Code Playgroud)

python django celery django-celery

2
推荐指数
1
解决办法
1万
查看次数

Celery检查队列是否正在执行

我必须产生芹菜任务,它必须有一些命名空间(例如用户ID).所以我产生了它

scrapper_start.apply_async((request.user.id,), queue=account.Account_username)
app.control.add_consumer(account.Account_username, reply=True)
Run Code Online (Sandbox Code Playgroud)

并且任务从其他任务递归地产生.现在我必须检查队列的任务是否正在执行.试图检查redis中的列表长度,它在celery开始执行之前返回真实数字.如何解决这个问题呢.我只需要检查,如果队列或消费者正在执行或已经为空.谢谢

python multithreading celery django-celery

2
推荐指数
1
解决办法
2086
查看次数

Django +芹菜 - 为什么我的周期性任务没有运行?

我正在使用django + celery进行定期任务.我的项目有django-celery == 3.1.17,使用的代理是Redis,它正在工作.

在我的设置文件中:

CELERYBEAT_SCHEDULE={
    'delivery_send': {
        'task': 'delivery.tasks.DeliverySendTask',
        'schedule': timedelta(minutes=1),
        'args': [],
    },
}

CELERY_ROUTES = {
    'delivery.tasks.DeliverySendTask': {
        'queue': 'periodic_tasks',
        'routing_key': 'periodic_tasks'
    }, 
}
Run Code Online (Sandbox Code Playgroud)

有更多的芹菜设置,但我想这两个与这个问题最相关.

任务:

class DeliverySendTask(Task):
    def run(self, *args, **kwargs):   
        logger.info('executing task!')     
        from .models import Dispatch
        Dispatch.objects.all().delete()
Run Code Online (Sandbox Code Playgroud)

然后我跑到python manage.py celery beat控制台看

[2016-06-23 14:32:01,230: INFO/MainProcess] Scheduler: Sending due task delivery_send (delivery.tasks.DeliverySendTask)
Run Code Online (Sandbox Code Playgroud)

但删除从未实际执行过.我错过了什么?

谢谢你的帮助.

python django django-celery

2
推荐指数
1
解决办法
2621
查看次数