小编J.B*_*ers的帖子

气流1.9.0 ExternalTask​​Sensor Soft_Fail =没有任何重试行为

我正在开发依赖于另一个DAG的DAG。因此,我正在使用ExternalTask​​Sensor。但是,在使用此Sensor时,我注意到一些奇怪的行为。

如果将soft_fail参数设置为True(如果任务失败,它将状态设置为“跳过”而不是失败),则该任务将永远不会重试。虽然我希望任务重试指定次数(通过retries参数)。如果将soft_fail参数设置为False,它将重试。请参阅下面的DAG最小示例。

我想念什么吗?

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
from datetime import datetime, timedelta


dag_name = 'soft_fail_example'
schedule_interval = "0 * * * *"
default_args = {
            'owner': 'airflow',
            'depends_on_past': False,
            'start_date': datetime(2018, 1, 1),
            'email': [],
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 1,
            'retry_delay': timedelta(seconds=30),
        }

test_dag = DAG(dag_name, default_args=default_args, schedule_interval=schedule_interval, 
catchup=False, max_active_runs=1)


ets = ExternalTaskSensor(task_id="test_external_task_sensor", dag=test_dag, soft_fail=True, 
timeout=10, retries=5, poke_interval=1, external_dag_id="dependent_dag_id", 
external_task_id="dependent_task_id")

dummy_task = DummyOperator(task_id="collection_task", dag=test_dag)

dummy_task << ets
Run Code Online (Sandbox Code Playgroud)

python airflow airflow-scheduler

5
推荐指数
0
解决办法
1454
查看次数

Airflow 1.9.0 ExternalTask​​Sensor retry_delay = 30产生TypeError:无法pickle _thread.RLock对象

正如标题所说; 在Airflow 1.9.0中如果你使用带有ExternalTask​​Sensor的retry_delay = 30(或任何其他数字)参数,DAG将运行正常,直到你想清除气流GUI中的任务实例 - >它将返回以下错误:"TypeError:不能pickle _thread.RLock对象"(和一个很好的Oops消息)但是如果你使用retry_delay = timedelta(seconds = 30)清除任务实例工作正常.

如果我查看models.py方法,深度检查应该没问题,所以对我来说这似乎很奇怪.我错过了什么,或者这是一个错误?

您可以在下面找到最小的DAG示例.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
from datetime import datetime, timedelta


dag_name = 'soft_fail_example'
schedule_interval = "0 * * * *"
default_args = {
            'owner': 'airflow',
            'depends_on_past': False,
            'start_date': datetime(2018, 1, 1),
            'email': [],
            'email_on_failure': False,
            'email_on_retry': False
        }

test_dag = DAG(dag_name, default_args=default_args, schedule_interval=schedule_interval, 
catchup=False, max_active_runs=1)


ets = ExternalTaskSensor(task_id="test_external_task_sensor", dag=test_dag, soft_fail=False, 
timeout=10, retries=0, poke_interval=1, retry_delay=30, external_dag_id="dependent_dag_id",
                         external_task_id="dependent_task_id")

dummy_task = DummyOperator(task_id="collection_task", …
Run Code Online (Sandbox Code Playgroud)

python airflow airflow-scheduler

1
推荐指数
1
解决办法
1507
查看次数

标签 统计

airflow ×2

airflow-scheduler ×2

python ×2