Airflow Python 操作符传递参数

Riy*_*med 6 python airflow

我正在尝试在气流 DAG 中编写一个 Python 运算符,并将某些参数传递给 Python 可调用对象。

我的代码如下所示。

def my_sleeping_function(threshold):
   print(threshold)

fmfdependency = PythonOperator(
   task_id='poke_check',
   python_callable=my_sleeping_function,
   provide_context=True,
   op_kwargs={'threshold': 100},
   dag=dag)

end = BatchEndOperator(
   queue=QUEUE,
   dag=dag)

start.set_downstream(fmfdependency)
fmfdependency.set_downstream(end)
Run Code Online (Sandbox Code Playgroud)

但我不断收到以下错误。

类型错误:my_sleeping_function() 得到了一个意外的关键字参数“dag_run”

无法弄清楚为什么。

tre*_*jas 7

在您的阈值参数之后将 **kwargs 添加到您的操作员参数列表中

  • 即使您没有为函数创建的参数,PythonOperator 也会将一组参数附加到您的函数调用中。这些运算符包括一些 Airflow 对象,例如上下文等。 (2认同)