我找到以下链接:
https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand
这确实解释了如何使用TriggerDagRunOperator来执行单独的Airflow dag.该文档使用Airflow自己的示例dags,但我很难理解那些因为他们没有使用任何传感器.
有人可以解释如何使用TriggerDagRunOperator和开始单独的dag SqlSensor?当我的SQL Server作业任务完成时,我正在尝试启动单独的DAG.我知道如何通过使用来检查SQL Server作业的状态SqlSensor,但我不知道如何将结果附加TriggerDagRunOperator到启动单独的DAG.
我不想使用Airflow CLI或在一个DAG中执行这两项任务.基本上,我希望这只是触发dag.
下面是我目前的代码,它缺少关键 conditionally_trigger
# File Name: check-when-db1-sql-task-is-done
from airflow import DAG
from airflow.operators import TriggerDagRunOperator
from airflow.operators import SqlSensor
from datetime import datetime
default_args = {
'owner': 'airflow',
'retry_delay': timedelta(minutes=5),
}
dag = DAG('check-when-db1-sql-task-is-done',
description='Check-when-DB1-SQL-task-is-done',
default_args=default_args,
schedule_interval='@once',
start_date=datetime.now(),
)
# returns-0-or-1-based-on-job-task-status
sqlsensor = SqlSensor (
task_id='sql-sensor',
poke_interval=30,
timeout=3200,
sql="""select last_run_outcome from msdb.dbo.sysjobsteps where job_id = '249A5A5D-6AFC-4D6B-8CB1-27C16724A450' and step_id = '1' and last_run_date …Run Code Online (Sandbox Code Playgroud)