我试图使用Airflow来执行一个简单的任务python.
from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
from pprint import pprint
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}
dag = DAG(dag_id='python_test', default_args=args)
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=print_context,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
如果我尝试,例如:
气流测试python_test打印2015-01-01
有用!
现在我想把我的def print_context(ds, **kwargs)函数放在其他python文件中.所以我创建了名为:simple_test.py的antoher文件并更改:
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=simple_test.print_context,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
现在我尝试再次运行:
气流测试python_test打印2015-01-01
好的!它仍然有效!
但是,如果我创建一个模块,例如,带有文件的worker模块 …
我有一个芹菜链,可以完成一些任务.每个任务都可能失败并重试.请参阅下面的快速示例:
from celery import task
@task(ignore_result=True)
def add(x, y, fail=True):
try:
if fail:
raise Exception('Ugly exception.')
print '%d + %d = %d' % (x, y, x+y)
except Exception as e:
raise add.retry(args=(x, y, False), exc=e, countdown=10)
@task(ignore_result=True)
def mul(x, y):
print '%d * %d = %d' % (x, y, x*y)
Run Code Online (Sandbox Code Playgroud)
和链:
from celery.canvas import chain
chain(add.si(1, 2), mul.si(3, 4)).apply_async()
Run Code Online (Sandbox Code Playgroud)
运行这两个任务(并假设没有任何失败),你会得到/看到打印:
1 + 2 = 3
3 * 4 = 12
Run Code Online (Sandbox Code Playgroud)
但是,当添加任务第一次失败并在后续重试调用中成功时,链中的其余任务不会运行,即添加任务失败,链中的所有其他任务都不会运行,几秒钟后,添加任务再次运行并成功,链中的其余任务(在本例中为mul.si(3,4))不会运行.
芹菜是否提供了从失败的任务继续失败链的方法?如果没有,那么实现这一目标的最佳方法是什么,并确保链的任务以指定的顺序运行,并且只有在前一个任务成功执行后,即使任务被重试几次?
注1:问题可以解决
add.delay(1, 2).get()
mul.delay(3, 4).get()
Run Code Online (Sandbox Code Playgroud)
但我有兴趣了解为什么链不能用于失败的任务.
在celeryd-multi的文档中,我们找到了这个例子:
# Advanced example starting 10 workers in the background:
# * Three of the workers processes the images and video queue
# * Two of the workers processes the data queue with loglevel DEBUG
# * the rest processes the default' queue.
$ celeryd-multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data
-Q default -L:4,5 DEBUG
Run Code Online (Sandbox Code Playgroud)
(从这里:http://docs.celeryproject.org/en/latest/reference/celery.bin.celeryd_multi.html#examples)
什么是一个实际的例子,说明为什么在一个主机上有多个工作程序处理相同的队列是好的,如上例所示?这不是设置并发性的原因吗?
更具体地说,以下两行(A和B)之间是否存在实际差异?:
A:
$ celeryd-multi start 10 -c 2 -Q data
Run Code Online (Sandbox Code Playgroud)
B:
$ celeryd-multi start 1 -c 20 …Run Code Online (Sandbox Code Playgroud) 我正在创建一个eta范围在3到20小时之间的任务,当我查看工作日志时,对于此任务,工作人员说" Got task from broker: ..."在收到原始任务之后每小时都会到达eta.
我知道这与设置BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': X}X是以秒为单位的数字有关.
所以我玩了visibility_timeout,如果我把它设置为不到1小时的任何时间,那么我可以看到工作者每X秒获得相同的任务,但是当我将visibility_timeoutX 设置为大于1小时时,它会保持默认为1h而不管我设定的时间.
还有其他人遇到过这个问题吗?这是一个知道错误吗?
我正在使用Redis服务器版本2.4.15的Celery 3.0.11(Chiastic Slide)
这段代码是我的芹菜工人脚本:
from app import celery, create_app
app = create_app('default')
app.app_context().push()
Run Code Online (Sandbox Code Playgroud)
当我尝试运行worker时,我会遇到这个错误:
File "/home/vagrant/myproject/venv/app/mymail.py", line 29, in send_email_celery
msg.html = render_template(template + '.html', **kwargs)
File "/home/vagrant/myproject/venv/local/lib/python2.7/site-packages/flask/templating.py", line 126, in render_template
ctx.app.update_template_context(context)
File "/home/vagrant/myproject/venv/local/lib/python2.7/site-packages/flask/app.py", line 716, in update_template_context
context.update(func())
TypeError: 'NoneType' object is not iterable
Run Code Online (Sandbox Code Playgroud)
我的问题是,在芹菜中使用工作人员时,如何发送电子邮件任务.
mymail.py
from flask import current_app, render_template
from flask.ext.mail import Message
from . import mail, celery
@celery.task
def send_async_email_celery(msg):
mail.send(msg)
def send_email_celery(to, subject, template, **kwargs):
app = current_app._get_current_object()
msg = Message(subject, sender=app.config['MAIL_SENDER'], recipients=[to])
msg.html = render_template(template + …Run Code Online (Sandbox Code Playgroud) 我想利用Celery(使用RabbitMQ作为后端MQ)通过不同的队列执行不同风格的任务.一个要求是来自特定队列的消费(由工作人员)应该具有暂停和恢复的能力.
Celery似乎通过调用和具有此功能.虽然我能够从特定工作人员的队列中取消任务的消耗,但我无法让工作人员通过呼叫恢复消费. 此处提供了重现此问题的代码.我的猜测很可能是我在启动工作人员时缺少某种参数或通过参数提供的参数? add_consumercancel_consumeradd_consumerceleryconfig
能够获得一些新鲜的眼睛会很棒.Stackoverflow关于add_consumer和Github的讨论不多.所以我希望这里有一些专家愿意分享他们的想法/经验.
-
我正在运行以下内容:
Windows操作系统,RabbitMQ 3.5.6,Erlang 18.1,Python 3.3.5,芹菜3.1.15
多天后,我有一个工作芹菜和芹菜节拍任务列表,结果使用django_celery_results存储.但是,当我查看表记录时,它没有任何有用的信息.
是否可以将任务ID设置为人类可读的东西?
一个例子是使用demo任务,它返回no,但是一个不可读的任务id
tasks.py
@app.task
def test(a,b):
return a + b
Run Code Online (Sandbox Code Playgroud)
app.settings中的调度程序
CELERYBEAT_SCHEDULE = {
'test_task': {
'task': 'home.tasks.test',
'schedule': crontab(minute='*/1'),
},
Run Code Online (Sandbox Code Playgroud) 我一直在运行一个带有芹菜工人的烧瓶应用程序,并且在三个独立的码头集装箱中没有任何问题.
这就是我开始它的方式:
celery worker -A app.controller.engine.celery -l info --concurrency=2 --pool eventlet
芹菜开始很好:
-------------- celery@a828bd5b0089 v4.2.1 (windowlicker)
---- **** -----
--- * *** * -- Linux-4.9.93-linuxkit-aufs-x86_64-with 2018-11-15 16:06:59
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: app.controller.engine:0x7f8ba4eb70b8
- ** ---------- .> transport: redis://redis:6379/0
- ** ---------- .> results: redis://redis:6379/1
- *** --- * --- .> concurrency: 2 (eventlet)
-- ******* ---- .> task events: ON
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks] …Run Code Online (Sandbox Code Playgroud) 如何测试是否仍在芹菜中处理任务(task_id)?我有以下场景:
有任何想法吗?可以查找芹菜正在处理的所有任务并检查我的是否仍然存在?
乍一看,我非常喜欢Celery中的"批量"功能,因为我需要在调用API之前对一定数量的ID进行分组(否则我可能会被踢掉).
不幸的是,在稍微测试一下时,批处理任务似乎与其他Canvas原语(在本例中为链)不能很好地兼容.例如:
@a.task(base=Batches, flush_every=10, flush_interval=5)
def get_price(requests):
for request in requests:
a.backend.mark_as_done(request.id, 42, request=request)
print "filter_by_price " + str([r.args[0] for r in requests])
@a.task
def completed():
print("complete")
Run Code Online (Sandbox Code Playgroud)
因此,通过这个简单的工作流程
chain(get_price.s("ID_1"), completed.si()).delay()
Run Code Online (Sandbox Code Playgroud)
我看到这个输出:
[2015-07-11 16:16:20,348: INFO/MainProcess] Connected to redis://localhost:6379/0
[2015-07-11 16:16:20,376: INFO/MainProcess] mingle: searching for neighbors
[2015-07-11 16:16:21,406: INFO/MainProcess] mingle: all alone
[2015-07-11 16:16:21,449: WARNING/MainProcess] celery@ultra ready.
[2015-07-11 16:16:34,093: WARNING/Worker-4] filter_by_price ['ID_1']
Run Code Online (Sandbox Code Playgroud)
5秒后,filter_by_price()会像预期的那样被触发.问题是completed()永远不会被调用.
有什么想法可以在这里发生?如果不使用批次,什么可能是一个解决这个问题的好方法?
PS:我已经CELERYD_PREFETCH_MULTIPLIER=0像文档说的那样设置了.
celery-task ×10
celery ×9
python ×6
flask ×2
rabbitmq ×2
airflow ×1
celerybeat ×1
celeryd ×1
django ×1
python-2.7 ×1
queue ×1
redis ×1
task-queue ×1