小编Alb*_* C.的帖子

强制 Airflow 的回填命令按顺序运行

有什么方法可以在不进行多任务处理的情况下按顺序运行回填?例如,如果我使用多个日期运行回填,例如气流回填 [dag] -s "2017-07-01" -e "2017-07-10",有没有办法在运行到第二天之前完成每个 dag ? 现在它在去下一个任务之前完成每项任务的所有日子。

谢谢。

python python-2.7 airflow

7
推荐指数
1
解决办法
2385
查看次数

气流定制jinja2过滤器

我正在尝试为我的气流jinja2模板添加自定义过滤器.

因为我在S3中的文件夹就像

/年月日/

,我的目的是在变量屏幕中使用yesterday_ds,如下所示:

s3://logs.web.com/AWSLogs/ {{yesterday_ds | get_year}}/{{yesterday_ds | get_month}}/{{yesterday_ds | get_day}} /

我见过的PR(我认为这是已经合并..),你可以在DAG对象创建的dag_args参数参数"user_defined_filters"做到这一点这里

问题是,即使这样做,它也会说"没有名为get_year的过滤器".

这是我的代码:

dag.py

   dag = DAG(
        dag_id='dag-name',
        default_args=utils.get_dag_args(user_defined_filters=utils.get_date_filters()),
        template_searchpath=tmpl_search_path,
        schedule_interval=timedelta(days=1),
        max_active_runs=1,
        )
Run Code Online (Sandbox Code Playgroud)

utils.py

def get_dag_args(**kwargs):
return {
    'owner'               : kwargs.get('owner', 'owner_name'),
    'depends_on_past'     : kwargs.get('depends_on_past', False),
    'start_date'          : kwargs.get('start_date', datetime.now() - timedelta(1)),
    'email'               : kwargs.get('email', ['blabla@blabla.com']),
    'retries'             : kwargs.get('retries', 5),
    'provide_context'     : kwargs.get('provide_context', True),
    'retry_delay'         : kwargs.get('retry_delay', timedelta(minutes=5)),
    'user_defined_filters': get_date_filters()
    }


def get_date_filters():
    return dict(
        get_year=lambda date_string: date_string.strftime('%Y'),
        get_month=lambda date_string: date_string.strftime('%m'),
        get_day=lambda date_string: date_string.strftime('%d'),
        )
Run Code Online (Sandbox Code Playgroud)

有人看到我弄错了吗?谢谢! …

python python-2.7 airflow apache-airflow

5
推荐指数
1
解决办法
997
查看次数

如何在EmailOperator任务的文件名中添加模板变量?(空气流动)

我似乎无法让这个工作.

我试图每天发送一个给定的文件,其名称类似于'file _ {{ds_nodash}}.csv'.

问题是我似乎无法将此名称添加为文件名,因为它似乎无法使用.在电子邮件的文本或主题工作完美,而不是在名称上.

这是dag的一个例子:

local_file = 'file-{{ds_nodash}}.csv'

send_stats_csv = EmailOperator(
    task_id='send-stats-csv',
    to=['email@gmail.com'],
    subject='Subject - {{ ds }}',
    html_content='Here is the new file.',
    files=[local_file],
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

错误代码:没有这样的文件或目录:u'file - {{ds_nodash}}.csv'

如果我按字面意思写出它的给定日期,它可以完美地工作.

我哪里错了?我该怎么办呢?

任何帮助,将不胜感激.

谢谢.

来自airflow文档的PD复制粘贴 - "Airflow引擎默认传递一些可在所有模板中访问的变量".https://airflow.incubator.apache.org/code.html

如果我理解正确,这些变量在执行时是可访问的,所以如果我执行dag,应该找到正确的文件?我已经尝试过测试任务或回填dag都没有成功.

python python-2.7 airflow apache-airflow

5
推荐指数
1
解决办法
3693
查看次数

标签 统计

airflow ×3

python ×3

python-2.7 ×3

apache-airflow ×2