我正在开发依赖于另一个DAG的DAG。因此,我正在使用ExternalTaskSensor。但是,在使用此Sensor时,我注意到一些奇怪的行为。
如果将soft_fail参数设置为True(如果任务失败,它将状态设置为“跳过”而不是失败),则该任务将永远不会重试。虽然我希望任务重试指定次数(通过retries参数)。如果将soft_fail参数设置为False,它将重试。请参阅下面的DAG最小示例。
我想念什么吗?
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
from datetime import datetime, timedelta
dag_name = 'soft_fail_example'
schedule_interval = "0 * * * *"
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email': [],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=30),
}
test_dag = DAG(dag_name, default_args=default_args, schedule_interval=schedule_interval,
catchup=False, max_active_runs=1)
ets = ExternalTaskSensor(task_id="test_external_task_sensor", dag=test_dag, soft_fail=True,
timeout=10, retries=5, poke_interval=1, external_dag_id="dependent_dag_id",
external_task_id="dependent_task_id")
dummy_task = DummyOperator(task_id="collection_task", dag=test_dag)
dummy_task << ets
Run Code Online (Sandbox Code Playgroud) 正如标题所说; 在Airflow 1.9.0中如果你使用带有ExternalTaskSensor的retry_delay = 30(或任何其他数字)参数,DAG将运行正常,直到你想清除气流GUI中的任务实例 - >它将返回以下错误:"TypeError:不能pickle _thread.RLock对象"(和一个很好的Oops消息)但是如果你使用retry_delay = timedelta(seconds = 30)清除任务实例工作正常.
如果我查看models.py方法,深度检查应该没问题,所以对我来说这似乎很奇怪.我错过了什么,或者这是一个错误?
您可以在下面找到最小的DAG示例.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
from datetime import datetime, timedelta
dag_name = 'soft_fail_example'
schedule_interval = "0 * * * *"
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email': [],
'email_on_failure': False,
'email_on_retry': False
}
test_dag = DAG(dag_name, default_args=default_args, schedule_interval=schedule_interval,
catchup=False, max_active_runs=1)
ets = ExternalTaskSensor(task_id="test_external_task_sensor", dag=test_dag, soft_fail=False,
timeout=10, retries=0, poke_interval=1, retry_delay=30, external_dag_id="dependent_dag_id",
external_task_id="dependent_task_id")
dummy_task = DummyOperator(task_id="collection_task", …Run Code Online (Sandbox Code Playgroud)