Oma*_*r14 4 python scheduler scheduled-tasks airflow
我想使用从任务传递的一个参数来执行一个函数。
这是我的带有状态参数的函数:
def sns_notify(state):
client = boto3.client('sns')
if state == "failed":
message = config.get('sns', 'message') + state
else:
message = config.get('sns', 'message') + state
response = client.publish(TargetArn=config.get('sns', 'target_arn'),
Message=message,
Subject=config.get('sns', 'subject'))
return response
Run Code Online (Sandbox Code Playgroud)
这是我的任务,状态为参数:
t1 = DummyOperator(task_id='Dummy-1', trigger_rule=TriggerRule.ALL_SUCCESS,
on_success_callback=sns_notify("ok"), dag=dag)
t2 = DummyOperator(task_id='Dummy-2', trigger_rule=TriggerRule.ONE_FAILED,
on_success_callback=sns_notify("failed"), dag=dag)
Run Code Online (Sandbox Code Playgroud)
当我运行 dag 时,该函数不会停止发送邮件(对于本例)
每次由气流加载 DAG 时,它都会执行,sns_notify("ok")因为您正在调用该函数。您只需传递sns_notify将接收的函数指针context。请参阅文档: https: //airflow.apache.org/code.html
trigger_rule与依赖任务的执行方式相关,因此与on_success_callback.
然而,我不确定如何将变量传递给这个回调 - 来这里寻找答案!