小编Ale*_*vev的帖子

气流ExternalTask​​Sensor卡住了

我正在尝试使用ExternalTask​​Sensor,并且它已经陷入了另一个已经成功完成的DAG任务.

这里,第一个DAG"a"完成其任务,之后应该触发通过ExternalTask​​Sensor的第二个DAG"b".相反,它陷入了寻找a.first_task的困境.

第一个DAG:

import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(
    dag_id='a',
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
    schedule_interval=None
)

def do_first_task():
    print('First task is done')

PythonOperator(
    task_id='first_task',
    python_callable=do_first_task,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

第二个DAG:

import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor

dag = DAG(
    dag_id='b',
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
    schedule_interval=None
)

def do_second_task():
    print('Second task is done')

ExternalTaskSensor(
    task_id='wait_for_the_first_task_to_be_completed',
    external_dag_id='a',
    external_task_id='first_task',
    dag=dag) >> \
PythonOperator(
    task_id='second_task',
    python_callable=do_second_task,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

我在这里错过了什么?

python airflow

10
推荐指数
2
解决办法
7393
查看次数

标签 统计

airflow ×1

python ×1