Dar*_*hta 5 python directed-acyclic-graphs airflow airflow-scheduler
我已经看到了这和这对SO问题,并作出相应的更改。但是,我的依赖DAG仍然卡在戳状态。以下是我的主DAG:
from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
today = datetime.today()
default_args = {
'depends_on_past': False,
'retries': 0,
'start_date': datetime(today.year, today.month, today.day),
'schedule_interval': '@once'
}
dag = DAG('call-procedure-and-bash', default_args=default_args)
call_procedure = JdbcOperator(
task_id='call_procedure',
jdbc_conn_id='airflow_db2',
sql='CALL AIRFLOW.TEST_INSERT (20)',
dag=dag
)
call_procedure
Run Code Online (Sandbox Code Playgroud)
以下是我的依赖DAG:
from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskSensor
today = datetime.today()
default_args = {
'depends_on_past': False,
'retries': 0,
'start_date': datetime(today.year, today.month, today.day),
'schedule_interval': '@once'
}
dag = DAG('external-dag-upstream', default_args=default_args)
task_sensor = ExternalTaskSensor(
task_id='link_upstream',
external_dag_id='call-procedure-and-bash',
external_task_id='call_procedure',
execution_delta=timedelta(minutes=-2),
dag=dag
)
count_rows = JdbcOperator(
task_id='count_rows',
jdbc_conn_id='airflow_db2',
sql='SELECT COUNT(*) FROM AIRFLOW.TEST',
dag=dag
)
count_rows.set_upstream(task_sensor)
Run Code Online (Sandbox Code Playgroud)
一旦执行主DAG,以下是从属DAG的日志:
[2019-01-10 11:43:52,951] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ...
[2019-01-10 11:44:52,955] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ...
[2019-01-10 11:45:52,961] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ...
[2019-01-10 11:46:52,949] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ...
[2019-01-10 11:47:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ...
[2019-01-10 11:48:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ...
[2019-01-10 11:49:52,905] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ...
Run Code Online (Sandbox Code Playgroud)
以下是主DAG执行的日志:
[2019-01-10 11:45:20,215] {{jdbc_operator.py:56}} INFO - Executing: CALL AIRFLOW.TEST_INSERT (20)
[2019-01-10 11:45:21,477] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:21,476] {{dbapi_hook.py:166}} INFO - CALL AIRFLOW.TEST_INSERT (20)
[2019-01-10 11:45:24,139] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:24,137] {{jobs.py:2627}} INFO - Task exited with return code 0
Run Code Online (Sandbox Code Playgroud)
我的假设是,如果主机运行正常,Airflow应该触发依赖的DAG吗?我试着玩,execution_delta但这似乎不起作用。
此外,schedule_interval与start_date相同的两个的DAG,所以不要认为应该引起任何麻烦。
我有什么想念的吗?
您可能应该使用正时间增量:https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/external_task_sensor.html,因为在减去执行增量时,它最终会寻找一个任务比它自己跑了2分钟。
然而,增量并不是真正的范围,TI 必须具有匹配的 Dag ID、任务 ID、成功结果以及日期时间列表中的执行日期。当您给出execution_delta增量时,它是一个日期时间的列表,采用当前执行日期并减去时间增量。
这可能取决于您删除时间增量,以便两个执行日期匹配并且传感器将等待直到另一个任务成功,或者您的开始日期和计划间隔基本上设置为今天并且获取的@once执行日期不处于可预测的锁定状态- 彼此并肩。您可以尝试设置 saydatetime(2019,1,10)和0 1 * * *让它们每天凌晨 1 点运行(同样没有execution_delta)。
| 归档时间: |
|
| 查看次数: |
1781 次 |
| 最近记录: |