我用 Airflow 模拟什么和在哪里PythonOperator,这样:
python_callback引发了异常,引发的通话on_failure_callback,并我曾尝试PythonOperator.execute在许多地方模拟 {python_callable} 和,但没有成功。
代码文件如下所示:
dags/my_code.py
class CustomException(Exception): pass
def a_callable():
if OurSqlAlchemyTable.count() == 0:
raise CustomException("{} is empty".format(OurSqlAlchemyTable.name))
return True
def a_failure_callable(context):
SlackWebhookHook(
http_conn_id=slack_conn_id,
message= context['exception'].msg,
channel='#alert-channel'
).execute()
Run Code Online (Sandbox Code Playgroud)
dags/a_dag.py
from my_code import a_callable, a_failure_callable
new_task = PythonOperator(
task_id='new-task', dag=dag-named-sue, conn_id='a_conn_id', timeout=30,
python_callable=a_callable,
on_failure_callback=a_failure_callable)
Run Code Online (Sandbox Code Playgroud)
dags/test_a_dag.py
class TestCallback(unittest.TestCase):
def test_on_failure_callback(self):
tested_task = DagBag().get_dag('dag-named-sue').get_task('new-task')
with patch('airflow.operators.python_operator.PythonOperator.execute') as mock_execute:
with patch('dags.a_dag.a_failure_callable') as mock_callback:
mock_execute.side_effect = CustomException
tested_task.execute(context={})
# does failure of …Run Code Online (Sandbox Code Playgroud)