标签: celery-task

删除celery/rabbitmq中的所有待处理任务

如何在不知道task_id每个任务的情况下删除所有待处理任务?

task rabbitmq celery celery-task

178
推荐指数
8
解决办法
10万
查看次数

如何手动从shell运行芹菜定期任务?

我正在使用芹菜和django芹菜.我已经定义了一个我想测试的周期性任务.是否可以手动从shell运行周期性任务,以便查看控制台输出?

python django celery celery-task django-celery

68
推荐指数
3
解决办法
3万
查看次数

Celery AttributeError:异步错误

我在我的Mac(OS/X 10.13.4)上本地运行RabbitMQ和Celery,当我运行add.delay(x,y)时,以下代码在本地运行:

#!/usr/bin/env python
from celery import Celery
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

app = Celery('tasks', \
        broker='pyamqp://appuser:xx@c2/appvhost', \
        backend='db+mysql://appuser:xx@c2/pigpen')

@app.task(bind=True)
def dump_context(self, x, y):
    print('Executing task id {0.id}, args: {0.args!r} kwargs {0.kwargs!r}'.format(self.request))

@app.task
def add(x, y):
    logger.info('Adding {0} + {1}'.format(x, y))
    return x + y
Run Code Online (Sandbox Code Playgroud)

但是,当我尝试在运行Kali 2018.2的ODROID-C2上运行Celery worker时(w.当前更新,运行时出现以下错误celery -A tasks worker --loglevel=info:

Traceback (most recent call last):
  File "/usr/local/bin/celery", line 11, in <module>
    sys.exit(main())
  File "/usr/local/lib/python2.7/dist-packages/celery/__main__.py", line 14, in main
    _main()
  File "/usr/local/lib/python2.7/dist-packages/celery/bin/celery.py", line 326, …
Run Code Online (Sandbox Code Playgroud)

python celery celery-task

58
推荐指数
5
解决办法
1万
查看次数

如何让celery任务从任务中失败?

在某些情况下,我想让celery任务从该任务中失败.我尝试了以下方法:

from celery.task import task
from celery import states

@task()
def run_simulation():
    if some_condition:
        run_simulation.update_state(state=states.FAILURE)
        return False
Run Code Online (Sandbox Code Playgroud)

但是,该任务仍然报告成功:

任务sim.tasks.run_simulation [9235e3a7-c6d2-4219-bbc7-acf65c816e65]成功于1.17847704887s:False

似乎状态只能在任务运行时修改,一旦完成 - 芹菜将状态更改为它认为的结果(参考此问题).有没有办法,通过提出异常来使任务失败,让芹菜返回任务失败?

celery celery-task

30
推荐指数
3
解决办法
1万
查看次数

Celery Worker错误:ImportError没有名为celery的模块

当我尝试启动我的芹菜工人时,我收到导入错误.我不确定是什么问题.任何帮助将受到高度赞赏.

我的项目:

email/__init__.py
    /celery.py
Run Code Online (Sandbox Code Playgroud)

我尝试通过调用来运行应用程序:

celery worker --app=email
Run Code Online (Sandbox Code Playgroud)

我已经按照这里的所有步骤进行了操作 - http://docs.celeryproject.org/en/latest/getting-started/next-steps.html#about-the-app-argument

追溯:

File "/Users/.../bin/celery", line 9, in <module>
    load_entry_point('celery==3.0.24', 'console_scripts', 'celery')()
File "/Users/.../lib/python2.7/site-packages/celery/__main__.py, line 14, in main
main()
File "/Users/.../lib/python2.7/site-packages/celery/bin/celery.py", line 957, in main
cmd.execute_from_commandline(argv)
File "/Users/.../lib/python2.7/site-packages/celery/bin/celery.py", line 901, in execute_from_commandline
super(CeleryCommand, self).execute_from_commandline(argv)))
File "/Users/.../lib/python2.7/site-packages/celery/bin/base.py", line 185, in execute_from_commandline
argv = self.setup_app_from_commandline(argv)
File "/Users/.../lib/python2.7/site-packages/celery/bin/base.py", line 300, in setup_app_from_commandline
self.app = self.find_app(app)
File "/Users/.../lib/python2.7/site-packages/celery/bin/base.py", line 317, in find_app
return self.find_app('%s.celery:' % (app.replace(':', ''), ))
File "/Users/.../lib/python2.7/site-packages/celery/bin/base.py", line 311, in find_app
sym …
Run Code Online (Sandbox Code Playgroud)

python celery flask celery-task

25
推荐指数
2
解决办法
5万
查看次数

Celery:列出所有任务,计划,活动*和*完成

更新赏金

如果可能的话,我想要一个不涉及监控线程的解决方案.


我知道我可以使用我的应用程序类查看计划活动任务.InspectControl

i = myapp.control.inspect()

currently_running = i.active()
scheduled = i.scheduled()
Run Code Online (Sandbox Code Playgroud)

但我找不到任何功能来显示已经完成的任务.我知道这些信息至少可以暂时访问,因为我可以通过以下方式查找已完成的任务task_id:

>>> r = my task.AsyncResult(task_id=' ... ')
>>> r.state
u'SUCCESS'
Run Code Online (Sandbox Code Playgroud)

如何获得计划,活动和已完成任务的完整列表?或者可能一次列出所有任务?

celery celery-task

22
推荐指数
1
解决办法
1万
查看次数

Celery任务在1970年具有time_start属性

检查当前运行的Celery任务会发现一个奇怪的time_start时间戳:

>> celery.app.control.inspect().active()
{u'celery@worker.hostname': [{u'acknowledged': True,
   u'args': u'(...,)',
   u'delivery_info': {u'exchange': u'celery',
    u'priority': 0,
    u'redelivered': None,
    u'routing_key': u'celery'},
   u'hostname': u'celery@worker.hostname',
   u'id': u'3d92fdfd-524e-4ba1-98cb-cf83af2ad8e9',
   u'kwargs': u'{}',
   u'name': u'task_name',
   u'time_start': 9636801.218162088,
   u'worker_pid': 7931}]}
Run Code Online (Sandbox Code Playgroud)

time_start属性将任务追溯到1970年(那是在创建Celery,Python之前,我没有自定义的DeLorean):

>> from datetime import datetime
>> datetime.fromtimestamp(9636801.218162088)
datetime.datetime(1970, 4, 22, 13, 53, 21, 218162)
Run Code Online (Sandbox Code Playgroud)

我是否误解了这个time_task属性?我的Celery应用程序配置错误了吗?

我在Linux上使用带有Django应用程序和Redis后端的Celery 3.1.4.

任务由执行的工作程序运行,如下所示:

./manage.py celery worker --loglevel=INFO --soft-time-limit=600 --logfile=/tmp/w1.log --pidfile=/tmp/w1.pid -n 'w1.%%h'
Run Code Online (Sandbox Code Playgroud)

python django redis celery celery-task

21
推荐指数
1
解决办法
3396
查看次数

芹菜停止执行链

我有一个check_orders任务,定期执行.它创建了一组任务,以便我可以计算执行任务所花费的时间,并在完成任务时执行某些操作(这是res.join [1]和grouped_subs的目的)分组的任务是成对的链式任务.

我想要的是当第一个任务不满足条件(失败)时不执行链中的第二个任务.对于我的生活,我无法理解这一点,我觉得这对于作业队列管理器来说是非常基本的功能.当我尝试在[2]之后注释掉的东西(引发异常,删除回调)......由于某种原因,我们在check_orders中遇到了join()(它打破了组).我已经尝试将ignore_result设置为False以及所有这些任务,但它仍然不起作用.

@task(ignore_result=True)
def check_orders():
    # check all the orders and send out appropriate notifications
    grouped_subs = []

    for thingy in things:
       ...

        grouped_subs.append(chain(is_room_open.subtask((args_sub_1, )), 
                        notify.subtask((args_sub_2, ), immutable=True)))

    res = group(grouped_subs).apply_async()

    res.join()         #[1]
    logger.info('Done checking orders at %s' % current_task.request.id))

@task(ignore_result=True)
def is_room_open(args_sub_1):
    #something time consuming
    if http_req_and_parse(args_sub_1):
        # go on and do the notify task
        return True
    else:
        # [2]
        # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how?
        # None of the following …
Run Code Online (Sandbox Code Playgroud)

python celery celery-task django-celery celerybeat

20
推荐指数
3
解决办法
6825
查看次数

芹菜中的导入错误

这是我正在运行的代码:

from __future__ import absolute_import
from celery import Celery
celery1 = Celery('celery',broker='amqp://',backend='amqp://',include=['tasks'])

celery1.conf.update(
   CELERY_TASK_RESULT_EXPIRES=3600,
       )
if __name__ == '__main__':
 celery1.start()
Run Code Online (Sandbox Code Playgroud)

当我执行上面的代码时,它给我以下错误:

   ImportError: cannot import name Celery
Run Code Online (Sandbox Code Playgroud)

python celery pycharm celery-task

19
推荐指数
1
解决办法
2万
查看次数

气流 - Python文件不在同一个DAG文件夹中

我试图使用Airflow来执行一个简单的任务python.

from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta


from pprint import pprint

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())

args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}

dag = DAG(dag_id='python_test', default_args=args)


def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=print_context,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

如果我尝试,例如:

气流测试python_test打印2015-01-01

有用!

现在我想把我的def print_context(ds, **kwargs)函数放在其他python文件中.所以我创建了名为:simple_test.py的antoher文件并更改:

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=simple_test.print_context,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

现在我尝试再次运行:

气流测试python_test打印2015-01-01

好的!它仍然有效!

但是,如果我创建一个模块,例如,带有文件的worker模块 …

python celery celery-task airflow

17
推荐指数
3
解决办法
2万
查看次数