气流外部传感器卡在外面

sia*_*sia 8 airflow airflow-scheduler

我想在另一个dag完成后开始一个dag.一个解决方案是使用外部传感器功能,下面你可以找到我的解决方案 我遇到的问题是依赖的dag卡在戳,我检查了这个答案 ,并确保两个dags运行在相同的时间表,我的简化代码如下:任何帮助将不胜感激.领导者dag:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
   'owner': 'airflow',
   'depends_on_past': False,
   'start_date': datetime(2015, 6, 1),
   'retries': 1,
   'retry_delay': timedelta(minutes=5),



 }

 schedule = '* * * * *'

 dag = DAG('leader_dag', default_args=default_args,catchup=False, 
 schedule_interval=schedule)

t1 = BashOperator(
   task_id='print_date',
   bash_command='date',
   dag=dag)
Run Code Online (Sandbox Code Playgroud)

依赖的dag:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.sensors import ExternalTaskSensor


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 10, 8),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),


}
schedule='* * * * *'
dag = DAG('dependent_dag', default_args=default_args, catchup=False, 
schedule_interval=schedule)

 wait_for_task = ExternalTaskSensor(task_id = 'wait_for_task', 
 external_dag_id = 'leader_dag', external_task_id='t1', dag=dag)

 t1 = BashOperator(
     task_id='print_date',
     bash_command='date',
      dag=dag)

 t1.set_upstream(wait_for_task)
Run Code Online (Sandbox Code Playgroud)

leader_dag的日志: 在此输入图像描述

依赖dag的日志:

在此输入图像描述

dla*_*lin 6

首先,task_id中的leader_dag名为,print_date但您设置dependent_dag了一个wait_for_task正在等待leader_dag名为的任务的任务t1。没有名为的任务t1。在py文件中分配给它的内容无关紧要,也没有在气流数据库中使用,也没有被传感器横向使用。它应该在等待任务名称print_date

其次,您的日志不与您要显示的dependent_dag正在等待的leader_dag运行顺序一致。

最后,我不建议您使用Airflow每分钟安排任务。当然不是两个从属任务在一起。考虑在诸如Spark的其他系统中编写流作业,或为此滚动自己的Celery或Dask环境。

您还可以ExternalTaskSensor通过TriggerDagRunOperator在Leader_dag的末尾添加a 来触发dependent_dag,并通过将设置为schedule_interval来从中删除计划,来避免这种情况None

我在您的日志中看到的是2018年10月13日T19:08:11的领导者日志。充其量这将是执行时间为dated_date 2018-10-13 19:07:00的dagrun,因为从19:07开始的分钟周期在19:08结束,这是可以安排的最早时间。如果是这种情况,我会发现在计划和执行之间会有大约11秒的延迟。但是,Airflow中可能会有几分钟的计划延迟。

我还从中看到一条日志,该日志从dependent_dag19:14:04到19:14:34运行,并且正在寻找相应的19:13:00 dagrun的完成。没有迹象表明您的调度程序有足够的时滞,可以在19:14:34之前启动19:13:00 leader_dagdagrun。如果您显示它戳了5分钟左右,您最好说服我。当然,它永远不会感觉到Leader_dag.t1,因为这不是您命名的所示任务。

因此,Airflow具有调度延迟,如果您的系统中有几千个dag,则可能会超过1分钟,因此a catchup=False会让您互相跟随IE 19:08、19:09和可能会发生一些跳过一分钟(或6)的运行,例如19:10,然后是19:16,并且由于延迟是基于dag-dag的,所以它是随机的,因此传感器可能一直处于未对准状态,一直等待着,即使您有正确的任务ID可以等待:

 wait_for_task = ExternalTaskSensor(
     task_id='wait_for_task', 
     external_dag_id='leader_dag',
-    external_task_id='t1',
+    external_task_id='print_date',
     dag=dag)
Run Code Online (Sandbox Code Playgroud)