小编zhu*_*gxy的帖子

如何扩展 PythonOperator

我正在尝试定制我的 PythonOperator 并将其放置在 $AIRFLOW_HOME/plugins 下,如下所示:

class MyPythonOperator(PythonOperator):

    def my_callable(param1, param2, param3):
        # do something

    @apply_defaults
    def __init__(self, task_id, *args, **kwargs):

        super(MyPythonOperator, self).__init__(
            task_id=task_id,  
            python_callable = self.my_callable,
            provide_context = True,
            *args, **kwargs)
Run Code Online (Sandbox Code Playgroud)

然后我定义了一个airflow dag代码,非常简单,只有两个任务:

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}

dag = DAG(
    dag_id='example_workflow',
    default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60),
)


task1 = MyPythonOperator(
    task_id='task1',
    params={'param1': 'param1_value',
            'param2': 'param2_value',
            'param3': 'param3_value'},
    dag=dag
)

task2 = MyPythonOperator(
    task_id='task2',
    params={'param1': 'param1_value',
            'param2': 'param2_value',
            'param2': 'param3_value'},
    dag=dag
)

task1 >> task2 …
Run Code Online (Sandbox Code Playgroud)

python airflow

5
推荐指数
1
解决办法
3139
查看次数

标签 统计

airflow ×1

python ×1