如何在不知道task_id每个任务的情况下删除所有待处理任务?
我正在使用芹菜和django芹菜.我已经定义了一个我想测试的周期性任务.是否可以手动从shell运行周期性任务,以便查看控制台输出?
我在我的Mac(OS/X 10.13.4)上本地运行RabbitMQ和Celery,当我运行add.delay(x,y)时,以下代码在本地运行:
#!/usr/bin/env python
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery('tasks', \
broker='pyamqp://appuser:xx@c2/appvhost', \
backend='db+mysql://appuser:xx@c2/pigpen')
@app.task(bind=True)
def dump_context(self, x, y):
print('Executing task id {0.id}, args: {0.args!r} kwargs {0.kwargs!r}'.format(self.request))
@app.task
def add(x, y):
logger.info('Adding {0} + {1}'.format(x, y))
return x + y
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试在运行Kali 2018.2的ODROID-C2上运行Celery worker时(w.当前更新,运行时出现以下错误celery -A tasks worker --loglevel=info:
Traceback (most recent call last):
File "/usr/local/bin/celery", line 11, in <module>
sys.exit(main())
File "/usr/local/lib/python2.7/dist-packages/celery/__main__.py", line 14, in main
_main()
File "/usr/local/lib/python2.7/dist-packages/celery/bin/celery.py", line 326, …Run Code Online (Sandbox Code Playgroud) 在某些情况下,我想让celery任务从该任务中失败.我尝试了以下方法:
from celery.task import task
from celery import states
@task()
def run_simulation():
if some_condition:
run_simulation.update_state(state=states.FAILURE)
return False
Run Code Online (Sandbox Code Playgroud)
但是,该任务仍然报告成功:
任务sim.tasks.run_simulation [9235e3a7-c6d2-4219-bbc7-acf65c816e65]成功于1.17847704887s:False
似乎状态只能在任务运行时修改,一旦完成 - 芹菜将状态更改为它认为的结果(参考此问题).有没有办法,通过提出异常来使任务失败,让芹菜返回任务失败?
当我尝试启动我的芹菜工人时,我收到导入错误.我不确定是什么问题.任何帮助将受到高度赞赏.
我的项目:
email/__init__.py
/celery.py
Run Code Online (Sandbox Code Playgroud)
我尝试通过调用来运行应用程序:
celery worker --app=email
Run Code Online (Sandbox Code Playgroud)
我已经按照这里的所有步骤进行了操作 - http://docs.celeryproject.org/en/latest/getting-started/next-steps.html#about-the-app-argument
追溯:
File "/Users/.../bin/celery", line 9, in <module>
load_entry_point('celery==3.0.24', 'console_scripts', 'celery')()
File "/Users/.../lib/python2.7/site-packages/celery/__main__.py, line 14, in main
main()
File "/Users/.../lib/python2.7/site-packages/celery/bin/celery.py", line 957, in main
cmd.execute_from_commandline(argv)
File "/Users/.../lib/python2.7/site-packages/celery/bin/celery.py", line 901, in execute_from_commandline
super(CeleryCommand, self).execute_from_commandline(argv)))
File "/Users/.../lib/python2.7/site-packages/celery/bin/base.py", line 185, in execute_from_commandline
argv = self.setup_app_from_commandline(argv)
File "/Users/.../lib/python2.7/site-packages/celery/bin/base.py", line 300, in setup_app_from_commandline
self.app = self.find_app(app)
File "/Users/.../lib/python2.7/site-packages/celery/bin/base.py", line 317, in find_app
return self.find_app('%s.celery:' % (app.replace(':', ''), ))
File "/Users/.../lib/python2.7/site-packages/celery/bin/base.py", line 311, in find_app
sym …Run Code Online (Sandbox Code Playgroud) 更新赏金
如果可能的话,我想要一个不涉及监控线程的解决方案.
我知道我可以使用我的应用程序类查看计划和活动任务.InspectControl
i = myapp.control.inspect()
currently_running = i.active()
scheduled = i.scheduled()
Run Code Online (Sandbox Code Playgroud)
但我找不到任何功能来显示已经完成的任务.我知道这些信息至少可以暂时访问,因为我可以通过以下方式查找已完成的任务task_id:
>>> r = my task.AsyncResult(task_id=' ... ')
>>> r.state
u'SUCCESS'
Run Code Online (Sandbox Code Playgroud)
如何获得计划,活动和已完成任务的完整列表?或者可能一次列出所有任务?
检查当前运行的Celery任务会发现一个奇怪的time_start时间戳:
>> celery.app.control.inspect().active()
{u'celery@worker.hostname': [{u'acknowledged': True,
u'args': u'(...,)',
u'delivery_info': {u'exchange': u'celery',
u'priority': 0,
u'redelivered': None,
u'routing_key': u'celery'},
u'hostname': u'celery@worker.hostname',
u'id': u'3d92fdfd-524e-4ba1-98cb-cf83af2ad8e9',
u'kwargs': u'{}',
u'name': u'task_name',
u'time_start': 9636801.218162088,
u'worker_pid': 7931}]}
Run Code Online (Sandbox Code Playgroud)
该time_start属性将任务追溯到1970年(那是在创建Celery,Python之前,我没有自定义的DeLorean):
>> from datetime import datetime
>> datetime.fromtimestamp(9636801.218162088)
datetime.datetime(1970, 4, 22, 13, 53, 21, 218162)
Run Code Online (Sandbox Code Playgroud)
我是否误解了这个time_task属性?我的Celery应用程序配置错误了吗?
我在Linux上使用带有Django应用程序和Redis后端的Celery 3.1.4.
任务由执行的工作程序运行,如下所示:
./manage.py celery worker --loglevel=INFO --soft-time-limit=600 --logfile=/tmp/w1.log --pidfile=/tmp/w1.pid -n 'w1.%%h'
Run Code Online (Sandbox Code Playgroud) 我有一个check_orders任务,定期执行.它创建了一组任务,以便我可以计算执行任务所花费的时间,并在完成任务时执行某些操作(这是res.join [1]和grouped_subs的目的)分组的任务是成对的链式任务.
我想要的是当第一个任务不满足条件(失败)时不执行链中的第二个任务.对于我的生活,我无法理解这一点,我觉得这对于作业队列管理器来说是非常基本的功能.当我尝试在[2]之后注释掉的东西(引发异常,删除回调)......由于某种原因,我们在check_orders中遇到了join()(它打破了组).我已经尝试将ignore_result设置为False以及所有这些任务,但它仍然不起作用.
@task(ignore_result=True)
def check_orders():
# check all the orders and send out appropriate notifications
grouped_subs = []
for thingy in things:
...
grouped_subs.append(chain(is_room_open.subtask((args_sub_1, )),
notify.subtask((args_sub_2, ), immutable=True)))
res = group(grouped_subs).apply_async()
res.join() #[1]
logger.info('Done checking orders at %s' % current_task.request.id))
@task(ignore_result=True)
def is_room_open(args_sub_1):
#something time consuming
if http_req_and_parse(args_sub_1):
# go on and do the notify task
return True
else:
# [2]
# STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how?
# None of the following …Run Code Online (Sandbox Code Playgroud) 这是我正在运行的代码:
from __future__ import absolute_import
from celery import Celery
celery1 = Celery('celery',broker='amqp://',backend='amqp://',include=['tasks'])
celery1.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
)
if __name__ == '__main__':
celery1.start()
Run Code Online (Sandbox Code Playgroud)
当我执行上面的代码时,它给我以下错误:
ImportError: cannot import name Celery
Run Code Online (Sandbox Code Playgroud) 我试图使用Airflow来执行一个简单的任务python.
from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
from pprint import pprint
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}
dag = DAG(dag_id='python_test', default_args=args)
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=print_context,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
如果我尝试,例如:
气流测试python_test打印2015-01-01
有用!
现在我想把我的def print_context(ds, **kwargs)函数放在其他python文件中.所以我创建了名为:simple_test.py的antoher文件并更改:
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=simple_test.print_context,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
现在我尝试再次运行:
气流测试python_test打印2015-01-01
好的!它仍然有效!
但是,如果我创建一个模块,例如,带有文件的worker模块 …