使用 PapermillOperator 运行气流作业时,dag 执行失败。
我在将参数传递给 PapermillOperator 时遇到问题。
我打开papermill_operator.py (packages/ airflow /operators/papermill_operator.py) 并硬编码一行来指定 papameters
def execute(self, context):
for i in range(len(self.inlets)):
pm.execute_notebook(self.inlets[i].location,
self.outlets[i].location,
parameters = dict(msgs="hello")
progress_bar=False, report_mode=True)
Run Code Online (Sandbox Code Playgroud)
然后它的工作
而原始代码是
def execute(self, context):
for i in range(len(self.inlets)):
pm.execute_notebook(self.inlets[i].location,
self.outlets[i].location,
parameters=self.inlets[i].parameters,
progress_bar=False, report_mode=True)
Run Code Online (Sandbox Code Playgroud)
尝试了另一个解决方案 https://github.com/nteract/papermill/issues/324#issuecomment-472446375 它工作正常
我的 DAG 代码是
import airflow
from airflow.models import DAG
from airflow.operators.papermill_operator import PapermillOperator
from datetime import timedelta
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='9', default_args=args,
schedule_interval='@once',
dagrun_timeout=timedelta(minutes=10))
run_this = PapermillOperator( …Run Code Online (Sandbox Code Playgroud)