Muk*_*ain 2 celery celery-task airflow airflow-scheduler
我刚刚设置了芹菜执行器的气流,这是我的DAG的骨架
dag = DAG('dummy_for_testing', default_args=default_args)
t1 = BashOperator(
task_id='print_date',
bash_command='date >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t3 = BashOperator(
task_id='print_host',
bash_command='hostname >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t2 = BashOperator(
task_id='print_uptime',
bash_command='uptime >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t2.set_upstream(t3)
t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)
我有2个工人.其中一个只运行一个被调用local_queue的队列,另一个运行两个名为的队列local_queue,test_queue
我想在一台机器上运行任务1,但在两台机器上运行任务2和3.即,在仅运行local_queue的worker 1上运行t2和t3,在运行local_queue和test_queue的worker 2上运行所有3(t1,t2和t3).任务运行总数应为5.
但是,当我运行它时,只运行3个任务.1)print_date为worker 2运行(这是正确的)2)print_host仅针对worker 1运行(不正确.应该为两个worker运行)和3)print_uptime仅针对worker 2运行(也是不正确的.应该为两个worker运行)
能否指导我如何设置它以便运行5个任务.在生产中,我想通过将机器分组到队列中以及所有具有QUEUE_A - > do X且所有机器具有QUEUE_B - > do Y等的机器来管理机器.
谢谢
而不是让一个工作人员工作2个队列,让每个工作人员工作一个队列.所以worker命令应如下所示:
airflow worker -q test_queue
airflow worker -q local_queue
Run Code Online (Sandbox Code Playgroud)
然后有两个相同的任务,但在不同的queus.
dag = DAG('dummy_for_testing', default_args=default_args)
t1 = BashOperator(
task_id='print_date',
bash_command='date >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t3 = BashOperator(
task_id='print_host',
bash_command='hostname >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t3_2 = BashOperator(
task_id='print_host_2',
bash_command='hostname >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t2 = BashOperator(
task_id='print_uptime',
bash_command='uptime >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t2_2 = BashOperator(
task_id='print_uptime_2',
bash_command='uptime >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t2.set_upstream(t3)
t2.set_upstream(t3_2)
t2.set_upstream(t1)
t2_2.set_upstream(t3)
t2_2.set_upstream(t3_2)
t2_2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)