气流:如何在多个工作人员上运行任务

Muk*_*ain 2 celery celery-task airflow airflow-scheduler

我刚刚设置了芹菜执行器的气流,这是我的DAG的骨架

dag = DAG('dummy_for_testing', default_args=default_args)

t1 = BashOperator(
    task_id='print_date',
    bash_command='date >> /tmp/dag_output.log',
    queue='test_queue',
    dag=dag)

t3 = BashOperator(
    task_id='print_host',
    bash_command='hostname >> /tmp/dag_output.log',
    queue='local_queue',
    dag=dag)

t2 = BashOperator(
    task_id='print_uptime',
    bash_command='uptime >> /tmp/dag_output.log',
    queue='local_queue',
    dag=dag)

t2.set_upstream(t3)
t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)

我有2个工人.其中一个只运行一个被调用local_queue的队列,另一个运行两个名为的队列local_queue,test_queue

我想在一台机器上运行任务1,但在两台机器上运行任务2和3.即,在仅运行local_queue的worker 1上运行t2和t3,在运行local_queue和test_queue的worker 2上运行所有3(t1,t2和t3).任务运行总数应为5.

但是,当我运行它时,只运行3个任务.1)print_date为worker 2运行(这是正确的)2)print_host仅针对worker 1运行(不正确.应该为两个worker运行)和3)print_uptime仅针对worker 2运行(也是不正确的.应该为两个worker运行)

能否指导我如何设置它以便运行5个任务.在生产中,我想通过将机器分组到队列中以及所有具有QUEUE_A - > do X且所有机器具有QUEUE_B - > do Y等的机器来管理机器.

谢谢

jhn*_*lvr 5

而不是让一个工作人员工作2个队列,让每个工作人员工作一个队列.所以worker命令应如下所示:

airflow worker -q test_queue
airflow worker -q local_queue
Run Code Online (Sandbox Code Playgroud)

然后有两个相同的任务,但在不同的queus.

dag = DAG('dummy_for_testing', default_args=default_args)

t1 = BashOperator(
    task_id='print_date',
    bash_command='date >> /tmp/dag_output.log',
    queue='test_queue',
    dag=dag)

t3 = BashOperator(
    task_id='print_host',
    bash_command='hostname >> /tmp/dag_output.log',
    queue='local_queue',
    dag=dag)

t3_2 = BashOperator(
    task_id='print_host_2',
    bash_command='hostname >> /tmp/dag_output.log',
    queue='test_queue',
    dag=dag)

t2 = BashOperator(
    task_id='print_uptime',
    bash_command='uptime >> /tmp/dag_output.log',
    queue='local_queue',
    dag=dag)

t2_2 = BashOperator(
    task_id='print_uptime_2',
    bash_command='uptime >> /tmp/dag_output.log',
    queue='test_queue',
    dag=dag)

t2.set_upstream(t3)
t2.set_upstream(t3_2)
t2.set_upstream(t1)

t2_2.set_upstream(t3)
t2_2.set_upstream(t3_2)
t2_2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)

  • 气流不是这样工作的。如果您的任务在一台主机上失败但在另一台主机上没有失败怎么办?当您需要查看日志时会发生什么?查看日志是基于每个任务的,那么您正在查看什么日志:主机 9 还是主机 10?您可以在迭代队列数组时在 for 循环中创建这些任务,这样,如果您添加一个新的工作线程,那么您只需将一个新队列添加到 DAG 顶部的数组中即可。 (2认同)