我有循环创建任务的列表。该列表的大小是静态的。
for counter, account_id in enumerate(ACCOUNT_LIST):
task_id = f"bash_task_{counter}"
if account_id:
trigger_task = BashOperator(
task_id=task_id,
bash_command="echo hello there",
dag=dag)
else:
trigger_task = BashOperator(
task_id=task_id,
bash_command="echo hello there",
dag=dag)
trigger_task.status = SKIPPED # is there way to somehow set status of this to skipped instead of having a branch operator?
trigger_task
Run Code Online (Sandbox Code Playgroud)
我手动尝试过此操作,但无法跳过该任务:
start = DummyOperator(task_id='start')
task1 = DummyOperator(task_id='task_1')
task2 = DummyOperator(task_id='task_2')
task3 = DummyOperator(task_id='task_3')
task4 = DummyOperator(task_id='task_4')
start >> task1
start >> task2
try:
start >> task3
raise AirflowSkipException
except AirflowSkipException as ase:
log.error('Task Skipped for task3')
try:
start >> task4
raise AirflowSkipException
except AirflowSkipException as ase:
log.error('Task Skipped for task4')
Run Code Online (Sandbox Code Playgroud)
小智 10
是的,你需要这样做raise AirflowSkipException
from airflow.exceptions import AirflowSkipException
raise AirflowSkipException
Run Code Online (Sandbox Code Playgroud)
更多信息请参见源代码
| 归档时间: |
|
| 查看次数: |
10080 次 |
| 最近记录: |