有没有人知道airflow test在bash提示符下运行时是否有办法设置dag_run.conf参数?
例如,我从官方气流存储库下载了example_trigger_target_dag,我想测试该run_this任务.通常我会做以下事情:
~/$ airflow test example_trigger_target_dag run_this '2018-01-01'
但是运行它会产生错误:
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------
[2018-05-02 10:50:01,154] {models.py:1342} INFO - Executing <Task(PythonOperator): run_this> on 2018-01-01 00:00:00
[2018-05-02 10:50:01,262] {models.py:1417} ERROR - 'NoneType' object has no attribute 'conf'
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/operators/python_operator.py", line 80, in execute
return_value = self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/annalect/uk_ds_airflow/dags/playpen/example_trigger_target_dag.py", line 56, in run_this_func
print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))
AttributeError: 'NoneType' object has no attribute 'conf'
Run Code Online (Sandbox Code Playgroud)
我已经开始使用这个task_params参数但是我要么使用了错误的语法,要么它没有实现我所追求的,因为它产生了与上面相同的错误:
~/$ airflow test --task_params '{"kwargs": {"dag_run": {"conf": {"message": "Hey world"}}}}' example_trigger_target_dag run_this '2018-01-01'
[2018-05-02 11:10:58,065] {models.py:1441} INFO - Marking task as FAILED.
[2018-05-02 11:10:58,070] {models.py:1462} ERROR - 'NoneType' object has no attribute 'conf'
Run Code Online (Sandbox Code Playgroud)
那么有谁知道如何测试依赖于dag_run.conf值的任务?
谢谢!
--conf该airflow test命令没有选项,但您可以通过将参数传递给任务来解决此问题python_callable.
在callable中,如果kwargs['test_mode']设置了,你可以检索参数来构建一个虚拟DagRun对象,如下所示:
from airflow.models import DagRun
...
def run_this_func(ds, **kwargs):
if kwargs['test_mode']:
kwargs['dag_run'] = DagRun(conf=kwargs['params'])
print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))
Run Code Online (Sandbox Code Playgroud)
要测试example_trigger_target_dag,只需:
airflow test example_trigger_target_dag test_trigger_dagrun "2018-01-01" -tp '{"message":"Hello world"}'
Run Code Online (Sandbox Code Playgroud)
你会得到:
Remotely received value of Hello world for key=message
Run Code Online (Sandbox Code Playgroud)
现在,您可以编写装饰器,而不是将测试代码放入任务中.此外,由于我们只是使用conf属性DagRun,我们也可以使用a SimpleNamespace.最后,为了避免在查找时出现潜在的键错误kwargs,我们可以使用get默认值.
from types import SimpleNamespace
def allow_conf_testing(func):
def wrapper(*args, **kwargs):
if kwargs.get('test_mode', False):
kwargs['dag_run'] = SimpleNamespace(conf=kwargs.get('params', {}))
func(*args, **kwargs)
return wrapper
@allow_conf_testing
def run_this_func(ds, **kwargs):
print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2670 次 |
| 最近记录: |