我正在尝试定制我的 PythonOperator 并将其放置在 $AIRFLOW_HOME/plugins 下,如下所示:
class MyPythonOperator(PythonOperator):
def my_callable(param1, param2, param3):
# do something
@apply_defaults
def __init__(self, task_id, *args, **kwargs):
super(MyPythonOperator, self).__init__(
task_id=task_id,
python_callable = self.my_callable,
provide_context = True,
*args, **kwargs)
Run Code Online (Sandbox Code Playgroud)
然后我定义了一个airflow dag代码,非常简单,只有两个任务:
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='example_workflow',
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
)
task1 = MyPythonOperator(
task_id='task1',
params={'param1': 'param1_value',
'param2': 'param2_value',
'param3': 'param3_value'},
dag=dag
)
task2 = MyPythonOperator(
task_id='task2',
params={'param1': 'param1_value',
'param2': 'param2_value',
'param2': 'param3_value'},
dag=dag
)
task1 >> task2 …Run Code Online (Sandbox Code Playgroud)