我有一个包含许多子任务的 DAG。在 DAG 的中间,有一个验证任务,根据任务的结果/返回代码,我想采用两条不同的路径。如果成功,将遵循一条路线(一系列任务),如果失败,我们将执行一组不同的任务。当前方法有两个问题,一是如果退出代码为 1,验证任务将执行多次(根据配置的重试次数)。二是无法采取不同的执行分支
为了解决问题 1,我们可以使用任务实例中可用的重试次数,它可以通过宏 {{ task_instance }} 获得。感谢有人可以为我们指出更简洁的方法,而采取不同路径的问题 2 仍未解决。
您可以retries在任务级别拥有。
run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
retries=3,
dag=dag,
)
run_this_last = DummyOperator(
task_id='run_this_last',
retries=1,
dag=dag,
)
Run Code Online (Sandbox Code Playgroud)
关于你的第二个问题,有一个Branching的概念。
该BranchPythonOperator很像PythonOperator只是它需要一个
python_callable一个返回TASK_ID(或task_ids的列表)。遵循返回的 task_id,并跳过所有其他路径。Python 函数返回的 task_id 必须直接引用任务下游的BranchPythonOperator任务。
示例 DAG:
import random
import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='example_branch_operator',
default_args=args,
schedule_interval="@daily",
)
run_this_first = DummyOperator(
task_id='run_this_first',
dag=dag,
)
options = ['branch_a', 'branch_b', 'branch_c', 'branch_d']
branching = BranchPythonOperator(
task_id='branching',
python_callable=lambda: random.choice(options),
dag=dag,
)
run_this_first >> branching
join = DummyOperator(
task_id='join',
trigger_rule='one_success',
dag=dag,
)
for option in options:
t = DummyOperator(
task_id=option,
dag=dag,
)
dummy_follow = DummyOperator(
task_id='follow_' + option,
dag=dag,
)
branching >> t >> dummy_follow >> join
Run Code Online (Sandbox Code Playgroud)
关于您的第一个问题,您可以很轻松地设置任务/操作员特定的重试选项。参考:baseoperator.py#L77。
问题二,您可以使用 DAG 轻松分支BranchPythonOperator(示例用法:example_branch_operator.py)。您将希望将验证任务/逻辑嵌套在BranchPythonOperator(您可以在运算符中定义和执行运算符)。
| 归档时间: |
|
| 查看次数: |
5194 次 |
| 最近记录: |