气流 on_failure_call_back 现在持续运行

lil*_*lil 1 python airflow airflow-scheduler

我想做一个松弛警报(失败状态和成功状态)

Dag 现在正在工作,当状态为 sucssess 状态时,松弛警报正在工作!但现在, on_failure_callback - 失败状态持续工作(每 1 分钟一次)

请注意,它继续失败。但是,它不起作用,我认为这不是真正的状态。

我该如何改变呢?...我想知道关于真正失败状态的松弛通知

现在我们的 task_default 参数是这样的。

dt = datetime.now(tz=tz.tzlocal())
task_default_args = {
    'owner': 'owner',
    'retries': 2,
    'retry_delay': timedelta(minutes=1),
    'start_date': datetime(2018, 11, 10),
    #'depends_on_past': False,
    'email': ['mail'],
    'email_on_failure': True,
    'email_on_retry': False,
    'on_failure_callback': send_slack(
        senderRole='airflow',
        receiverSubscribe='bot',
        level='info',
        text='= fail' + str(dt),
        X_CAG_AUTH='AG_CONSUMER_TOKEN access-key=500000000000',
    ),
    'execution_timeout': timedelta(minutes=30)
}

-- > Dag Contents like this 


start = DummyOperator(
    task_id='start',
    dag=dag)

tmp_slack_test_dag = PostgresOperator(pool=redshift_pool,
                      task_id='tmp_slack_test_sql',
                      postgres_conn_id=redshift_conn_id,
                      sql="""sql/tmp_.sql""",
                      parameters=None,
                      autocommit=True,
                      dag=dag
                    )

success_dummy = DummyOperator(
    task_id='success_dummy',
    dag=dag,
    trigger_rule=TriggerRule.ALL_SUCCESS
)

alert_success_task = PythonOperator(
    task_id='alert_success',
    python_callable=lambda: send_slack(
        senderRole='airflow',
        receiverSubscribe='bot',
        level='info',
        text='success'+str(dt),
        X_CAG_AUTH='AG_CONSUMER_TOKEN access-key=500000000000'
    ),
    #depends_on_past=True,
    dag=dag
)

end = DummyOperator(
    task_id='end',
    dag=dag)

start >> tmp_slack_test_dag >> success_dummy >> alert_success_task >> end
Run Code Online (Sandbox Code Playgroud)

kax*_*xil 5

那是因为您需要将函数传递给on_failure_callback而不是函数的输出。

将其更改为以下 ie 将您的警报功能分开,并将该功能的名称传递给 on_failure_callback

def slack_failed_task(contextDictionary, **kwargs):  
       failed_alert = SlackAPIPostOperator(
         task_id='slack_failed',
         channel="#datalabs",
         token="...",
         text = ':red_circle: DAG Failed',
         owner = '_owner',)
         return failed_alert.execute


task_with_failed_slack_alerts = PythonOperator(
    task_id='task0',
    python_callable=<file to execute>,
    on_failure_callback=slack_failed_task,
    provide_context=True,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)