我正在尝试使用ExternalTaskSensor,并且它已经陷入了另一个已经成功完成的DAG任务.
这里,第一个DAG"a"完成其任务,之后应该触发通过ExternalTaskSensor的第二个DAG"b".相反,它陷入了寻找a.first_task的困境.
第一个DAG:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='a',
default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
schedule_interval=None
)
def do_first_task():
print('First task is done')
PythonOperator(
task_id='first_task',
python_callable=do_first_task,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
第二个DAG:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor
dag = DAG(
dag_id='b',
default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
schedule_interval=None
)
def do_second_task():
print('Second task is done')
ExternalTaskSensor(
task_id='wait_for_the_first_task_to_be_completed',
external_dag_id='a',
external_task_id='first_task',
dag=dag) >> \
PythonOperator(
task_id='second_task',
python_callable=do_second_task,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
我在这里错过了什么?
jhn*_*lvr 15
ExternalTaskSensor 假定您依赖于具有相同执行日期的dag运行中的任务.
这意味着在您的情况下a,b需要按照相同的时间表运行(例如,每天上午9:00或w/e).
否则,您需要使用execution_delta或execution_date_fn实例化时ExternalTaskSensor.
以下是运营商内部的文档,以帮助进一步澄清:
:param execution_delta: time difference with the previous execution to
look at, the default is the same execution_date as the current task.
For yesterday, use [positive!] datetime.timedelta(days=1). Either
execution_delta or execution_date_fn can be passed to
ExternalTaskSensor, but not both.
:type execution_delta: datetime.timedelta
:param execution_date_fn: function that receives the current execution date
and returns the desired execution date to query. Either execution_delta
or execution_date_fn can be passed to ExternalTaskSensor, but not both.
:type execution_date_fn: callable
Run Code Online (Sandbox Code Playgroud)
tom*_*mcm 15
为了澄清我在此处和其他相关问题上看到的内容,dag 不一定必须按照已接受的答案中所述的时间表运行。dags 也不需要具有相同的start_date. 如果您创建的ExternalTaskSensor任务没有使用execution_delta或execution_date_fn,则两个 dag 需要具有相同的执行日期。碰巧的是,如果两个 dag 具有相同的计划,则每个间隔中的计划运行将具有相同的执行日期。我不确定手动触发的预定 dag 运行的执行日期是什么。
为了让这个例子起作用, dagb的ExternalTaskSensor任务需要一个execution_deltaorexecution_date_fn参数。如果使用execution_delta参数,则应该是b的执行日期 - execution_delta=a的执行日期。如果使用execution_date_fn,则该函数应返回a的执行日期。
如果您使用的是TriggerDagRunOperator,然后使用 anExternalTaskSensor来检测该 dag 何时完成,您可以执行一些操作,例如将主 dag 的执行日期传递给带有TriggerDagRunOperator'sexecution_date参数的触发日期,例如execution_date='{{ execution_date }}'. 那么两个 dag 的执行日期将相同,并且您不需要每个 dag 的计划都相同,或者使用execution_delta或execution_date_fn传感器参数。
以上是在 Airflow 1.10.9 上编写和测试的