如何扩展 PythonOperator

zhu*_*gxy 5 python airflow

我正在尝试定制我的 PythonOperator 并将其放置在 $AIRFLOW_HOME/plugins 下,如下所示:

class MyPythonOperator(PythonOperator):

    def my_callable(param1, param2, param3):
        # do something

    @apply_defaults
    def __init__(self, task_id, *args, **kwargs):

        super(MyPythonOperator, self).__init__(
            task_id=task_id,  
            python_callable = self.my_callable,
            provide_context = True,
            *args, **kwargs)
Run Code Online (Sandbox Code Playgroud)

然后我定义了一个airflow dag代码,非常简单,只有两个任务:

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}

dag = DAG(
    dag_id='example_workflow',
    default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60),
)


task1 = MyPythonOperator(
    task_id='task1',
    params={'param1': 'param1_value',
            'param2': 'param2_value',
            'param3': 'param3_value'},
    dag=dag
)

task2 = MyPythonOperator(
    task_id='task2',
    params={'param1': 'param1_value',
            'param2': 'param2_value',
            'param2': 'param3_value'},
    dag=dag
)

task1 >> task2
Run Code Online (Sandbox Code Playgroud)

但在我运行 dag python 代码后,收到错误消息:

$ python example_airflow_code.py
[2019-05-15 19:51:10,338] {__init__.py:51} INFO - Using executor SequentialExecutor
usage: example_airflow_code.py [-h]
                               {list_tasks,backfill,test,run,pause,unpause,list_dag_runs}
                               ...
example_airflow_code.py: error: too few arguments
Run Code Online (Sandbox Code Playgroud)

我尝试了一些调试,并在这一行插入一个断点:

super(MyPythonOperator, self).__init__()

我发现在调用超级构造函数之前,self.dag和self.dag_id的值不正常,值为:

str: Traceback (most recent call last):
  File "/Applications/Eclipse.app/Contents/Eclipse/plugins/org.python.pydev.core_6.4.4.201807281807/pysrc/_pydevd_bundle/pydevd_resolver.py", line 166, in _getPyDictionary
    attr = getattr(var, n)
  File "/Users/zhuangxy/anaconda2/lib/python2.7/site-packages/airflow/models/__init__.py", line 2399, in dag_id
    return 'adhoc_' + self.owner
AttributeError: 'MyPythonOperator' object has no attribute 'owner'
Run Code Online (Sandbox Code Playgroud)

有人知道这个例子有什么问题吗?非常感谢!

小智 1

我最近也遇到了这个。您似乎缺少context自定义 PythonOperator 上的参数。

更改您的方法定义,使其看起来像这样:

def my_callable(param1, param2, param3, **context):
    # do something
Run Code Online (Sandbox Code Playgroud)

失败的原因是provide_context=True您在运算符中提供的标志。由于某种原因,Python 可调用函数正在您的参数中查找它。