有什么方法可以在不进行多任务处理的情况下按顺序运行回填?例如,如果我使用多个日期运行回填,例如气流回填 [dag] -s "2017-07-01" -e "2017-07-10",有没有办法在运行到第二天之前完成每个 dag ? 现在它在去下一个任务之前完成每项任务的所有日子。
谢谢。
我正在尝试为我的气流jinja2模板添加自定义过滤器.
因为我在S3中的文件夹就像
/年月日/
,我的目的是在变量屏幕中使用yesterday_ds,如下所示:
s3://logs.web.com/AWSLogs/ {{yesterday_ds | get_year}}/{{yesterday_ds | get_month}}/{{yesterday_ds | get_day}} /
我见过的PR(我认为这是已经合并..),你可以在DAG对象创建的dag_args参数参数"user_defined_filters"做到这一点这里
问题是,即使这样做,它也会说"没有名为get_year的过滤器".
这是我的代码:
dag.py
dag = DAG(
dag_id='dag-name',
default_args=utils.get_dag_args(user_defined_filters=utils.get_date_filters()),
template_searchpath=tmpl_search_path,
schedule_interval=timedelta(days=1),
max_active_runs=1,
)
Run Code Online (Sandbox Code Playgroud)
utils.py
def get_dag_args(**kwargs):
return {
'owner' : kwargs.get('owner', 'owner_name'),
'depends_on_past' : kwargs.get('depends_on_past', False),
'start_date' : kwargs.get('start_date', datetime.now() - timedelta(1)),
'email' : kwargs.get('email', ['blabla@blabla.com']),
'retries' : kwargs.get('retries', 5),
'provide_context' : kwargs.get('provide_context', True),
'retry_delay' : kwargs.get('retry_delay', timedelta(minutes=5)),
'user_defined_filters': get_date_filters()
}
def get_date_filters():
return dict(
get_year=lambda date_string: date_string.strftime('%Y'),
get_month=lambda date_string: date_string.strftime('%m'),
get_day=lambda date_string: date_string.strftime('%d'),
)
Run Code Online (Sandbox Code Playgroud)
有人看到我弄错了吗?谢谢! …
我似乎无法让这个工作.
我试图每天发送一个给定的文件,其名称类似于'file _ {{ds_nodash}}.csv'.
问题是我似乎无法将此名称添加为文件名,因为它似乎无法使用.在电子邮件的文本或主题工作完美,而不是在名称上.
这是dag的一个例子:
local_file = 'file-{{ds_nodash}}.csv'
send_stats_csv = EmailOperator(
task_id='send-stats-csv',
to=['email@gmail.com'],
subject='Subject - {{ ds }}',
html_content='Here is the new file.',
files=[local_file],
dag=dag)
Run Code Online (Sandbox Code Playgroud)
错误代码:没有这样的文件或目录:u'file - {{ds_nodash}}.csv'
如果我按字面意思写出它的给定日期,它可以完美地工作.
我哪里错了?我该怎么办呢?
任何帮助,将不胜感激.
谢谢.
来自airflow文档的PD复制粘贴 - "Airflow引擎默认传递一些可在所有模板中访问的变量".https://airflow.incubator.apache.org/code.html
如果我理解正确,这些变量在执行时是可访问的,所以如果我执行dag,应该找到正确的文件?我已经尝试过测试任务或回填dag都没有成功.