气流定制jinja2过滤器

Alb*_* C. 5 python python-2.7 airflow apache-airflow

我正在尝试为我的气流jinja2模板添加自定义过滤器.

因为我在S3中的文件夹就像

/年月日/

,我的目的是在变量屏幕中使用yesterday_ds,如下所示:

s3://logs.web.com/AWSLogs/ {{yesterday_ds | get_year}}/{{yesterday_ds | get_month}}/{{yesterday_ds | get_day}} /

我见过的PR(我认为这是已经合并..),你可以在DAG对象创建的dag_args参数参数"user_defined_filters"做到这一点这里

问题是,即使这样做,它也会说"没有名为get_year的过滤器".

这是我的代码:

dag.py

   dag = DAG(
        dag_id='dag-name',
        default_args=utils.get_dag_args(user_defined_filters=utils.get_date_filters()),
        template_searchpath=tmpl_search_path,
        schedule_interval=timedelta(days=1),
        max_active_runs=1,
        )
Run Code Online (Sandbox Code Playgroud)

utils.py

def get_dag_args(**kwargs):
return {
    'owner'               : kwargs.get('owner', 'owner_name'),
    'depends_on_past'     : kwargs.get('depends_on_past', False),
    'start_date'          : kwargs.get('start_date', datetime.now() - timedelta(1)),
    'email'               : kwargs.get('email', ['blabla@blabla.com']),
    'retries'             : kwargs.get('retries', 5),
    'provide_context'     : kwargs.get('provide_context', True),
    'retry_delay'         : kwargs.get('retry_delay', timedelta(minutes=5)),
    'user_defined_filters': get_date_filters()
    }


def get_date_filters():
    return dict(
        get_year=lambda date_string: date_string.strftime('%Y'),
        get_month=lambda date_string: date_string.strftime('%m'),
        get_day=lambda date_string: date_string.strftime('%d'),
        )
Run Code Online (Sandbox Code Playgroud)

有人看到我弄错了吗?谢谢!

编辑

在dag定义后打印这个,不幸的是显示没有自定义过滤器:(.

jinja_env = dag.get_template_env()
print(jinja_env.filters)
Run Code Online (Sandbox Code Playgroud)

此外,如果我尝试将其直接添加为DAG对象参数,如test @ tests/models.py中所示:

Broken DAG: [/home/ubuntu/airflow/dags/dag.py] __init__() got an unexpected keyword argument 'user_defined_filters'
Run Code Online (Sandbox Code Playgroud)

编辑2

好吧,我看到的是我有1.8.0版本,而且这个没有过滤器.有人知道如何通过点子下载1.8.2rc吗?或者我们不能?

小智 0

pip 上的 Airflow 包装名称已更改。1.8.2rc1 可以使用 pip install apache-airflow 下载。

另请注意,根据邮件列表,他们目前正在将 1.8.2rc4 发布为 1.8.2。