Alb*_* C. 5 python python-2.7 airflow apache-airflow
我正在尝试为我的气流jinja2模板添加自定义过滤器.
因为我在S3中的文件夹就像
/年月日/
,我的目的是在变量屏幕中使用yesterday_ds,如下所示:
s3://logs.web.com/AWSLogs/ {{yesterday_ds | get_year}}/{{yesterday_ds | get_month}}/{{yesterday_ds | get_day}} /
我见过的PR(我认为这是已经合并..),你可以在DAG对象创建的dag_args参数参数"user_defined_filters"做到这一点这里
问题是,即使这样做,它也会说"没有名为get_year的过滤器".
这是我的代码:
dag.py
dag = DAG(
dag_id='dag-name',
default_args=utils.get_dag_args(user_defined_filters=utils.get_date_filters()),
template_searchpath=tmpl_search_path,
schedule_interval=timedelta(days=1),
max_active_runs=1,
)
Run Code Online (Sandbox Code Playgroud)
utils.py
def get_dag_args(**kwargs):
return {
'owner' : kwargs.get('owner', 'owner_name'),
'depends_on_past' : kwargs.get('depends_on_past', False),
'start_date' : kwargs.get('start_date', datetime.now() - timedelta(1)),
'email' : kwargs.get('email', ['blabla@blabla.com']),
'retries' : kwargs.get('retries', 5),
'provide_context' : kwargs.get('provide_context', True),
'retry_delay' : kwargs.get('retry_delay', timedelta(minutes=5)),
'user_defined_filters': get_date_filters()
}
def get_date_filters():
return dict(
get_year=lambda date_string: date_string.strftime('%Y'),
get_month=lambda date_string: date_string.strftime('%m'),
get_day=lambda date_string: date_string.strftime('%d'),
)
Run Code Online (Sandbox Code Playgroud)
有人看到我弄错了吗?谢谢!
编辑
在dag定义后打印这个,不幸的是显示没有自定义过滤器:(.
jinja_env = dag.get_template_env()
print(jinja_env.filters)
Run Code Online (Sandbox Code Playgroud)
此外,如果我尝试将其直接添加为DAG对象参数,如test @ tests/models.py中所示:
Broken DAG: [/home/ubuntu/airflow/dags/dag.py] __init__() got an unexpected keyword argument 'user_defined_filters'
Run Code Online (Sandbox Code Playgroud)
编辑2
好吧,我看到的是我有1.8.0版本,而且这个没有过滤器.有人知道如何通过点子下载1.8.2rc吗?或者我们不能?
小智 0
pip 上的 Airflow 包装名称已更改。1.8.2rc1 可以使用 pip install apache-airflow 下载。
另请注意,根据邮件列表,他们目前正在将 1.8.2rc4 发布为 1.8.2。
| 归档时间: |
|
| 查看次数: |
997 次 |
| 最近记录: |