损坏的 DAG:[/opt/airflow/dags/my_dag.py] 回溯(最近一次调用):文件“/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py ”,第 179 行,在 apply_defaults result = func(self, *args, **kwargs) 文件“/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py”中,行136、在init中
引发AirflowException('python_callable参数必须可调用')airflow.exceptions.AirflowException:python_callable参数必须可调用
import airflow
from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from datetime import datetime
from random import randint
from airflow.operators.bash import BashOperator
def _training_model():
return randint(1,11)
def _choose_best_model(ti):
accuracies = ti.xcom_pull(task_ids =[
'training_model_A'
'training_model_B'
'training_model_C'
])
best_accuracy = max(accuracies)
if (best_accuracy >8):
return 'accurate'
return 'inaccurate'
with DAG(
dag_id="mobile_app_usage", start_date=datetime(2021,1,1),
schedule_interval="@daily",catchup=False) as dag:
training_model_A = PythonOperator(
task_id = "training_model_A",
python_callable=_training_model()
)
training_model_B = PythonOperator(
task_id = "training_model_B",
python_callable=_training_model()
)
training_model_C = PythonOperator(
task_id = "training_model_B",
python_callable=_training_model()
)
choose_best_model = BranchPythonOperator(
task_id = "choose_best_model",
python_callable= _choose_best_model()
)
accurate = BashOperator(
task_id ="accurate",
bash_command="echo Accurate"
)
inaccurate = BashOperator(
task_id ="inaccurate",
bash_command="echo Inacurate"
)
Run Code Online (Sandbox Code Playgroud)
该python_callable参数只需要要执行的可调用函数的名称,而不需要实际调用它。像这样:
...
training_model_A = PythonOperator(
task_id = "training_model_A",
python_callable=_training_model
)
training_model_B = PythonOperator(
task_id = "training_model_B",
python_callable=_training_model
)
training_model_C = PythonOperator(
task_id = "training_model_C",
python_callable=_training_model
)
choose_best_model = BranchPythonOperator(
task_id = "choose_best_model",
python_callable= _choose_best_model
)
...
Run Code Online (Sandbox Code Playgroud)
仅供参考 - 在上面的代码片段中,我还更新了task_id分配为“training_model_C”的任务的参数,因为它与task_id“training_model_B”相同。在 Airflow 中,task_idDAG 中的值必须是唯一的。