sia*_*sia 8 airflow airflow-scheduler
我想在另一个dag完成后开始一个dag.一个解决方案是使用外部传感器功能,下面你可以找到我的解决方案 我遇到的问题是依赖的dag卡在戳,我检查了这个答案 ,并确保两个dags运行在相同的时间表,我的简化代码如下:任何帮助将不胜感激.领导者dag:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
schedule = '* * * * *'
dag = DAG('leader_dag', default_args=default_args,catchup=False,
schedule_interval=schedule)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
Run Code Online (Sandbox Code Playgroud)
依赖的dag:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.sensors import ExternalTaskSensor
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 10, 8),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
schedule='* * * * *'
dag = DAG('dependent_dag', default_args=default_args, catchup=False,
schedule_interval=schedule)
wait_for_task = ExternalTaskSensor(task_id = 'wait_for_task',
external_dag_id = 'leader_dag', external_task_id='t1', dag=dag)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t1.set_upstream(wait_for_task)
Run Code Online (Sandbox Code Playgroud)
依赖dag的日志:
首先,task_id中的leader_dag名为,print_date但您设置dependent_dag了一个wait_for_task正在等待leader_dag名为的任务的任务t1。没有名为的任务t1。在py文件中分配给它的内容无关紧要,也没有在气流数据库中使用,也没有被传感器横向使用。它应该在等待任务名称print_date。
其次,您的日志不与您要显示的dependent_dag正在等待的leader_dag运行顺序一致。
最后,我不建议您使用Airflow每分钟安排任务。当然不是两个从属任务在一起。考虑在诸如Spark的其他系统中编写流作业,或为此滚动自己的Celery或Dask环境。
您还可以ExternalTaskSensor通过TriggerDagRunOperator在Leader_dag的末尾添加a 来触发dependent_dag,并通过将设置为schedule_interval来从中删除计划,来避免这种情况None。
我在您的日志中看到的是2018年10月13日T19:08:11的领导者日志。充其量这将是执行时间为dated_date 2018-10-13 19:07:00的dagrun,因为从19:07开始的分钟周期在19:08结束,这是可以安排的最早时间。如果是这种情况,我会发现在计划和执行之间会有大约11秒的延迟。但是,Airflow中可能会有几分钟的计划延迟。
我还从中看到一条日志,该日志从dependent_dag19:14:04到19:14:34运行,并且正在寻找相应的19:13:00 dagrun的完成。没有迹象表明您的调度程序有足够的时滞,可以在19:14:34之前启动19:13:00 leader_dagdagrun。如果您显示它戳了5分钟左右,您最好说服我。当然,它永远不会感觉到Leader_dag.t1,因为这不是您命名的所示任务。
因此,Airflow具有调度延迟,如果您的系统中有几千个dag,则可能会超过1分钟,因此a catchup=False会让您互相跟随IE 19:08、19:09和可能会发生一些跳过一分钟(或6)的运行,例如19:10,然后是19:16,并且由于延迟是基于dag-dag的,所以它是随机的,因此传感器可能一直处于未对准状态,一直等待着,即使您有正确的任务ID可以等待:
wait_for_task = ExternalTaskSensor(
task_id='wait_for_task',
external_dag_id='leader_dag',
- external_task_id='t1',
+ external_task_id='print_date',
dag=dag)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1727 次 |
| 最近记录: |