D_u*_*usv 3 python jinja2 airflow
我在气流中编写了以下代码,在dag本地测试了脚本,它像梦一样工作。现在,我试图使它在气流中运转,dag但没有任何运气,我尝试了多种运算,但无济于事
def fill_nulls (ds,file_in):
csv_file = glob.glob(os.path.join(r'/tmp/', file_in))
df = pd.read_csv(csv_file, sep='\t',header=None,error_bad_lines=False, index_col=False, dtype='unicode')
df = df.fillna(r'\N')
df.loc[:,df.dtypes==object].apply(lambda s:s.str.replace(" ", r'\N'))
df.to_csv(csv_file,sep='\t',header=None,index=False, quoting=csv.QUOTE_NONE)
fill_nulls = PythonOperator(
task_id='fill_nulls',
python_callable=fill_nulls,
provide_context=True,
templates_dict = {'file_in':'apollo_export_{{macros.ds_format(macros.ds_add( ds, -2),\'%Y-%m-%d\',\'%Y%m%d\')}}.csv'},
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
: Traceback (most recent call last):
[2018-01-25 10:11:50,016] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module>
[2018-01-25 10:11:50,017] {base_task_runner.py:98} INFO - Subtask: args.func(args)
[2018-01-25 10:11:50,017] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-01-25 10:11:50,017] {base_task_runner.py:98} INFO - Subtask: pool=args.pool,
[2018-01-25 10:11:50,018] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-01-25 10:11:50,018] {base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs)
[2018-01-25 10:11:50,019] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1493, in _run_raw_task
[2018-01-25 10:11:50,019] {base_task_runner.py:98} INFO - Subtask: result = task_copy.execute(context=context)
[2018-01-25 10:11:50,020] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 89, in execute
[2018-01-25 10:11:50,020] {base_task_runner.py:98} INFO - Subtask: return_value = self.execute_callable()
[2018-01-25 10:11:50,020] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 94, in execute_callable
[2018-01-25 10:11:50,021] {base_task_runner.py:98} INFO - Subtask: return self.python_callable(*self.op_args, **self.op_kwargs)
[2018-01-25 10:11:50,021] {base_task_runner.py:98} INFO - Subtask: TypeError: fill_nulls() got an unexpected keyword argument 'next_execution_date'
Run Code Online (Sandbox Code Playgroud)
任何帮助将非常感激!
小智 8
我认为您在函数定义中遗漏了kwargs。
def fill_nulls(ds, file_in, **kwargs):
Run Code Online (Sandbox Code Playgroud)