小编tls*_*skr的帖子

如何模拟 PythonOperator 的 python_callable 和 on_failure_callback?

我用 Airflow 模拟什么和在哪里PythonOperator,这样:

  • python_callback引发了异常,引发的通话on_failure_callback,并
  • 我可以测试是否调用了该回调,以及使用什么参数?

我曾尝试PythonOperator.execute在许多地方模拟 {python_callable} 和,但没有成功。

代码文件如下所示:

dags/my_code.py

class CustomException(Exception): pass

def a_callable():
    if OurSqlAlchemyTable.count() == 0:
        raise CustomException("{} is empty".format(OurSqlAlchemyTable.name))
    return True

def a_failure_callable(context):
    SlackWebhookHook(
        http_conn_id=slack_conn_id,
        message= context['exception'].msg,
        channel='#alert-channel'
        ).execute()
Run Code Online (Sandbox Code Playgroud)

dags/a_dag.py

from my_code import a_callable, a_failure_callable

new_task = PythonOperator(
    task_id='new-task', dag=dag-named-sue, conn_id='a_conn_id', timeout=30,
    python_callable=a_callable,
    on_failure_callback=a_failure_callable)
Run Code Online (Sandbox Code Playgroud)

dags/test_a_dag.py

class TestCallback(unittest.TestCase):

    def test_on_failure_callback(self):
        tested_task = DagBag().get_dag('dag-named-sue').get_task('new-task')

        with patch('airflow.operators.python_operator.PythonOperator.execute') as mock_execute:
            with patch('dags.a_dag.a_failure_callable') as mock_callback:
                mock_execute.side_effect = CustomException
                tested_task.execute(context={})

            # does failure of …
Run Code Online (Sandbox Code Playgroud)

python unit-testing mocking airflow

5
推荐指数
0
解决办法
989
查看次数

标签 统计

airflow ×1

mocking ×1

python ×1

unit-testing ×1