Airflow:当on_success_callback执行带参数的函数时

Oma*_*r14 4 python scheduler scheduled-tasks airflow

我想使用从任务传递的一个参数来执行一个函数。

这是我的带有状态参数的函数:

def sns_notify(state):
    client = boto3.client('sns')
    if state == "failed":
        message = config.get('sns', 'message') + state
    else:
        message = config.get('sns', 'message') + state
    response = client.publish(TargetArn=config.get('sns', 'target_arn'),
                              Message=message,
                              Subject=config.get('sns', 'subject'))
    return response
Run Code Online (Sandbox Code Playgroud)

这是我的任务,状态为参数:

t1 = DummyOperator(task_id='Dummy-1', trigger_rule=TriggerRule.ALL_SUCCESS,
                   on_success_callback=sns_notify("ok"), dag=dag)

t2 = DummyOperator(task_id='Dummy-2', trigger_rule=TriggerRule.ONE_FAILED,
                   on_success_callback=sns_notify("failed"), dag=dag)
Run Code Online (Sandbox Code Playgroud)

当我运行 dag 时,该函数不会停止发送邮件(对于本例)

hoj*_*oju 5

每次由气流加载 DAG 时,它都会执行,sns_notify("ok")因为您正在调用该函数。您只需传递sns_notify将接收的函数指针context。请参阅文档: https: //airflow.apache.org/code.html

trigger_rule与依赖任务的执行方式相关,因此与on_success_callback.

然而,我不确定如何将变量传递给这个回调 - 来这里寻找答案!