标签: celery-task

重置Celery任务的倒计时

我需要在倒计时后运行芹菜任务,但能够在某些条件下重置倒计时.例如,我要打电话apply_asynccountdown=15,但如果某些事件发生时执行任务之前,我想再次设定倒计时15秒.

从文档和谷歌上搜索我想我可以通过保存的任务ID,然后撤销,如果事件发生重建任务的完成这个任务.我想知道是否有更优雅的方法.

django celery celery-task

8
推荐指数
1
解决办法
631
查看次数

如何使用redis经纪人从芹菜中删除任务?

我有添加一些错误taskcelery使用Redis的broker

但现在我想删除不正确的task,我找不到任何方法来做到这一点

是否有一些命令或一些api这样做?

python celery celery-task celeryd

8
推荐指数
1
解决办法
1万
查看次数

芹菜:在特定时间间隔后执行任务

我想在他们注册后48小时向我的用户发送一封电子邮件.如何使用芹菜实现这一目标?如果我创建一个定期任务来发送电子邮件,我将必须决定我想要执行该任务的特定时间.我不想每秒都运行芹菜任务来检查是否需要发送任何电子邮件.

django celery celery-task django-celery celerybeat

8
推荐指数
1
解决办法
3470
查看次数

从未知任务中检索Celery中'task_id'的结果

如果我以前不知道执行了哪个任务,如何提取任务的结果?这是设置:给定以下源('tasks.py'):

from celery import Celery

app = Celery('tasks', backend="db+mysql://u:p@localhost/db", broker = 'amqp://guest:guest@localhost:5672//')

@app.task
def add(x,y):
   return x + y


@app.task
def mul(x,y):
   return x * y
Run Code Online (Sandbox Code Playgroud)

RabbitMQ 3.3.2在本地运行:

marcs-mbp:sbin marcstreeter$ ./rabbitmq-server

              RabbitMQ 3.3.2. Copyright (C) 2007-2014 GoPivotal, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
  ######  ##        /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
  ##########
              Starting broker... completed with 10 plugins.
Run Code Online (Sandbox Code Playgroud)

与Celery 3.1.12在本地运行:

 -------------- celery@Marcs-MacBook-Pro.local v3.1.12 (Cipater)
---- **** -----
--- * ***  * -- Darwin-13.2.0-x86_64-i386-64bit
-- * - **** …
Run Code Online (Sandbox Code Playgroud)

python jobs celery celery-task

8
推荐指数
1
解决办法
4570
查看次数

使用主管运行芹菜作为守护程序不起作用

我有一个django应用程序,它有芹菜功能,所以我能够成功运行芹菜如下

celery -A tasks worker --loglevel=info
Run Code Online (Sandbox Code Playgroud)

但作为一个众所周知的事实,我们需要将它作为守护进程运行,所以我在celery.conf文件/etc/supervisor/conf.d/夹中写了下面的文件

; ==================================
;  celery worker supervisor example
; ==================================

[program:celery]
; Set full path to celery program if using virtualenv
command=/root/Envs/proj/bin/celery -A app.tasks worker --loglevel=info

user=root
environment=C_FORCE_ROOT="yes"
environment=HOME="/root",USER="root"
directory=/root/apps/proj/structure
numprocs=1
stdout_logfile=/var/log/celery/worker.log
stderr_logfile=/var/log/celery/worker.log
autostart=true
autorestart=true
startsecs=10

; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 600

; When resorting to send SIGKILL to the program …
Run Code Online (Sandbox Code Playgroud)

django celery celery-task supervisord

8
推荐指数
1
解决办法
1万
查看次数

如何按名称限制运行Celery任务的最大数量

如何限制可以同时运行的特定Celery任务的实例数?

我有一个处理大文件的任务.我遇到了一个问题,用户可能会启动多个任务,导致服务器在尝试一次处理太多文件时耗尽CPU和内存.我想确保在任何给定时间只运行这种类型任务的N个实例,并且其他任务将在调度程序中排队等待,直到其他任务完成.

我看到任务装饰器中有一个rate_limit选项,但我认为这不符合我的要求.如果我正确理解文档,这只会限制任务的启动速度,但不会限制运行任务的总数,因此这会使我的服务器崩溃得更慢......但它仍然会尽管如此.

python celery celery-task

8
推荐指数
2
解决办法
2674
查看次数

When I use Django Celery apply_async with eta, it does the job immediately

i looked at celery documentation and trying something from it but it not work like the example. maybe i'm wrong at some point, please give me some pointer if i'm wrong about the following code

in views.py i have something like this:

class Something(CreateView):
  model = something

  def form_valid(self, form):
    obj = form.save(commit=False)
    number = 5
    test_limit = datetime.now() + timedelta(minutes=5)
    testing_something.apply_async((obj, number), eta=test_limit)
    obj.save()
Run Code Online (Sandbox Code Playgroud)

and in celery tasks i wrote something like this:

@shared_task()
def add_number(obj, number):
    base = …
Run Code Online (Sandbox Code Playgroud)

python django celery celery-task django-celery

8
推荐指数
1
解决办法
5192
查看次数

无法在 Celery 中使用 SQLite 作为后端

我有一个简单的项目,没有任何框架,如 Django 或 Flask。

我的配置是这样的:

CELERY_BROKER_BACKEND = "sqlakombu.transport.Transport"
CELERY_BROKER_URL = "db+sqlite:///results.sqlite"
CELERY_CACHE_BACKEND = "db+sqlite:///results.sqlite"
CELERY_RESULT_BACKEND = "db+sqlite:///results.sqlite"
Run Code Online (Sandbox Code Playgroud)

我已经安装了软件包:

SQLAlchemy==1.2.8
kombu==4.2.0
kombu-sqlalchemy==1.1.0
celery==4.1.1
Run Code Online (Sandbox Code Playgroud)

当我尝试启动芹菜时:

celery worker -A proj --loglevel=info
Run Code Online (Sandbox Code Playgroud)

我收到这个错误:

File "/home/cyildiz/Projects/env/sample/local/lib/python2.7/site-packages/kombu/connection.py", line 582, in get_transport_cls
transport_cls = get_transport_cls(transport_cls)
File "/home/cyildiz/Projects/env/sample/local/lib/python2.7/site-packages/kombu/transport/__init__.py", line 83, in get_transport_cls     _transport_cache[transport] = resolve_transport(transport)
File "/home/cyildiz/Projects/env/sample/local/lib/python2.7/site-packages/kombu/transport/__init__.py", line 64, in resolve_transport
raise KeyError('No such transport: {0}'.format(transport))
KeyError: u'No such transport: db'
Run Code Online (Sandbox Code Playgroud)

当我将设置更改为:

CELERY_BROKER_BACKEND = "sqlakombu.transport.Transport"
CELERY_BROKER_URL = "sqla+sqlite:///results.sqlite"
CELERY_CACHE_BACKEND = "sqla+sqlite:///results.sqlite"
CELERY_RESULT_BACKEND = "sqla+sqlite:///results.sqlite"
Run Code Online (Sandbox Code Playgroud)

错误是:

File "/usr/lib/python2.7/importlib/__init__.py", line …
Run Code Online (Sandbox Code Playgroud)

python celery celery-task

8
推荐指数
1
解决办法
7487
查看次数

如何在 Django 中等待 celery 任务的结果

我有一个 celery 任务,它将数据发送到另一个服务。我已将芹菜任务添加send_inventory_requestRequestSupplyStock基于类的视图中。当我发帖时,我应该首先从 celery 任务获取结果,然后继续并返回响应。我想首先等待 celery 任务的结果,然后从 post 方法返回响应,这是实现此目的的正确方法。

@app.task
def send_inventory_request(payload,token):
    auth = {'authorization':token}
    HEADERS.update(auth)
    url = PROCUREMENT_SUPPLY_STOCK_REQUESTS_URL
    res = requests.post(url,json=payload,headers=HEADERS)
    inventory_request_data = res.json()
    x= logger.info('Supply Stock Request {0} + {1}'.format(payload,token))
    print(x)
    return inventory_request_data 
Run Code Online (Sandbox Code Playgroud)

看法

class RequestSupplyStock(generics.CreateAPIView):

      def post(self, request, format=None):
          ........

          send_inventory_request.delay(payload,get_token(request))

          .........
          return Response(status=status.HTTP_201_CREATED)
Run Code Online (Sandbox Code Playgroud)

python django celery celery-task django-rest-framework

8
推荐指数
1
解决办法
2万
查看次数

访问芹菜任务导致组内的链

我与芹菜的工作流程是:

group1 = (task1, task2)
chain2 = (task3, group1)
group3 = (task4, task5, chain2)
Run Code Online (Sandbox Code Playgroud)

当我开始时group3,一切都很顺利:所有的任务都是用我需要的"依赖"来执行的.任务执行一些操作,然后返回一个布尔值.我想查看每项任务的结果.不幸的是,我无法检索所有结果:

group3.results
Run Code Online (Sandbox Code Playgroud)

收益:

True, True, tuple
Run Code Online (Sandbox Code Playgroud)

元组是这样的:

('8a8b7c2c-db44-4096-ba29-93ad2cd63409', [('576966ec-0ce5-4d82-9ab5-a23da805299b', None), ('777c77a3-34d6-4021-943f-8c39e7e87311', None)])
Run Code Online (Sandbox Code Playgroud)

我不能像链式结果那样处理它.如果我创建一个asyncresult与ID 8a8b7c2c-db44-4096-ba29-93ad2cd63409,我只能访问子任务的结果组(即:我得到的task1task2结果,但没办法得到task3的结果).

这个方法非常复杂,但我在celery文档中找不到具体的东西,在那里我找到了检索简单组/链结果的所有方法.

鉴于我真的了解工作流程,访问所有结果的最佳方法是什么?

celery celery-task django-celery

7
推荐指数
0
解决办法
841
查看次数