最近我测试了气流这么多,execution_date在运行时有一个问题airflow trigger_dag <my-dag>.
我知道这execution_date不是我们第一次从这里想到的:
Airflow是作为ETL需求的解决方案而开发的.在ETL世界中,您通常会汇总数据.所以,如果我想总结2016-02-19的数据,我会在格林威治标准时间2016-02-20午夜进行,这将在2016-02-19的所有数据可用之后.
start_date = datetime.combine(datetime.today(),
datetime.min.time())
args = {
"owner": "xigua",
"start_date": start_date
}
dag = DAG(dag_id="hadoopprojects", default_args=args,
schedule_interval=timedelta(days=1))
wait_5m = ops.TimeDeltaSensor(task_id="wait_5m",
dag=dag,
delta=timedelta(minutes=5))
Run Code Online (Sandbox Code Playgroud)
上面的代码是我日常工作流程的开始部分,第一个任务是TimeDeltaSensor在实际工作之前等待另外5分钟,所以这意味着我的dag将被触发2016-09-09T00:05:00,2016-09-10T00:05:00......等等.
在Web UI中,我可以看到类似的东西scheduled__2016-09-20T00:00:00,并且运行任务2016-09-21T00:00:00,根据ETL模型看似合理.
但是有一天我的dag不会因为未知原因被触发,所以我手动触发它,如果我触发它2016-09-20T00:10:00,那么TimeDeltaSensor将等到2016-09-21T00:15:00运行之前.
这不是我想要的,我希望它2016-09-20T00:15:00不是在第二天运行,我试过execution_date通过--conf '{"execution_date": "2016-09-20"}',但它不起作用.
我该如何处理这个问题?
$ airflow version
[2016-09-21 17:26:33,654] {__init__.py:36} INFO - Using executor LocalExecutor
____________ _____________
____ |__( )_________ __/__ …Run Code Online (Sandbox Code Playgroud) 出于某种原因,Airflow 似乎不会触发具有每周计划间隔的 dag 的最新运行。
当前的日期:
$ date
$ Tue Aug 9 17:09:55 UTC 2016
Run Code Online (Sandbox Code Playgroud)
DAG:
from datetime import datetime
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
dag = DAG(
dag_id='superdag',
start_date=datetime(2016, 7, 18),
schedule_interval=timedelta(days=7),
default_args={
'owner': 'Jon Doe',
'depends_on_past': False
}
)
BashOperator(
task_id='print_date',
bash_command='date',
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
运行调度程序
$ airflow scheduler -d superdag
Run Code Online (Sandbox Code Playgroud)
您预计总共有四次 DAG 运行,因为调度程序应该回填 7/18、7/25、8/1 和 8/8。但是,未安排最后一次运行。
编辑 1:
我理解 Vineet 虽然这似乎并不能解释我的问题。
在我上面的示例中,DAG 的开始日期是 7 月 18 日。
任务开始时间找不到解决方案。我有代码,找不到我错了。
当我运行DAG时,分别为25.03、26.03、27.03。任务已经完成,但是今天(28.03)的任务没有在6:48开始。
我尝试使用cron表达式,钟摆,日期时间和结果相同。当地时间(UTC + 3)和气流时间(UTC)不同。我尝试在“开始日期”或“时间表间隔”中使用每次(本地,空气流量),但没有结果。
使用:Ubuntu,Airflow 1.9.0版和本地执行程序。
emailname = Variable.get('test_mail')
l_start_date = datetime(2018, 3, 25, 6, 48)
l_schedule_interval = '@daily'
WORKFLOW_DEFAULT_ARGS = {
'owner': 'owner',
'depends_on_past': True,
'start_date': l_start_date,
'email': emailname,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retries_delay': timedelta(minutes=1),
}
# initialize the DAG
dag = DAG(
dag_id='test_dag_mail',
default_args=WORKFLOW_DEFAULT_ARGS,
schedule_interval=l_schedule_interval,
start_date=l_start_date,
)
Run Code Online (Sandbox Code Playgroud)