airflow TriggerDagRunOperator如何更改执行日期

ozw*_*5rd 6 triggers airflow

我注意到,对于计划任务,执行日期是根据过去设置的

Airflow是作为ETL需求的解决方案而开发的.在ETL世界中,您通常会汇总数据.所以,如果我想总结2016-02-19的数据,我会在格林威治标准时间2016-02-20午夜进行,这将在2016-02-19的所有数据可用之后.

但是,当dag触发另一个dag时,执行时间设置为now().

有没有办法让触发的dags具有相同的触发dag的执行时间?当然,我可以重写模板并使用yesterday_ds,但是,这是一个棘手的解决方案.

7yl*_*l4r 6

下面的类进一步扩展TriggerDagRunOperator以允许将执行日期作为字符串传递,然后将其转换回日期时间。这有点hacky,但这是我发现完成工作的唯一方法。

from datetime import datetime
import logging

from airflow import settings
from airflow.utils.state import State
from airflow.models import DagBag
from airflow.operators.dagrun_operator import TriggerDagRunOperator, DagRunOrder

class MMTTriggerDagRunOperator(TriggerDagRunOperator):
    """
    MMT-patched for passing explicit execution date
    (otherwise it's hard to hook the datetime.now() date).
    Use when you want to explicity set the execution date on the target DAG
    from the controller DAG.

    Adapted from Paul Elliot's solution on airflow-dev mailing list archives:
    http://mail-archives.apache.org/mod_mbox/airflow-dev/201711.mbox/%3cCAJuWvXgLfipPmMhkbf63puPGfi_ezj8vHYWoSHpBXysXhF_oZQ@mail.gmail.com%3e

    Parameters
    ------------------
    execution_date: str
        the custom execution date (jinja'd)

    Usage Example:
    -------------------
    my_dag_trigger_operator = MMTTriggerDagRunOperator(
        execution_date="{{execution_date}}"
        task_id='my_dag_trigger_operator',
        trigger_dag_id='my_target_dag_id',
        python_callable=lambda: random.getrandbits(1),
        params={},
        dag=my_controller_dag
    )
    """
    template_fields = ('execution_date',)

    def __init__(
        self, trigger_dag_id, python_callable, execution_date,
        *args, **kwargs
        ):
        self.execution_date = execution_date
        super(MMTTriggerDagRunOperator, self).__init__(
            trigger_dag_id=trigger_dag_id, python_callable=python_callable,
           *args, **kwargs
       )

    def execute(self, context):
        run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S')
        dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
        dro = self.python_callable(context, dro)
        if dro:
            session = settings.Session()
            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                state=State.RUNNING,
                execution_date=self.execution_date,
                conf=dro.payload,
                external_trigger=True)
            logging.info("Creating DagRun {}".format(dr))
            session.add(dr)
            session.commit()
            session.close()
        else:
            logging.info("Criteria not met, moving on")
Run Code Online (Sandbox Code Playgroud)

使用它而不是设置时可能会遇到一个问题execution_date=now():如果您尝试使用相同的execution_date两次启动 dag,您的操作员将抛出一个 mysql 错误。这是因为execution_datedag_id用于创建行索引并且不能插入具有相同索引的行。

我想不出有什么理由让您无论如何都希望execution_date在生产中运行两个相同的 dag ,但这是我在测试时遇到的问题,您不应该对此感到震惊。只需清除旧作业或使用不同的日期时间。


Ena*_*Ena 6

nowTriggerDagRunOperator有一个execution_date参数来设置触发运行的执行日期。不幸的是,该参数不在模板字段中。如果将其添加到模板字段(或者如果您覆盖运算符并更改 template_fields 值),则可以像这样使用它:

my_trigger_task= TriggerDagRunOperator(task_id='my_trigger_task',
                                              trigger_dag_id="triggered_dag_id",
                                              python_callable=conditionally_trigger,
                                              execution_date= '{{execution_date}}',
                                              dag=dag)
Run Code Online (Sandbox Code Playgroud)

它尚未发布,但您可以在此处查看来源: https: //github.com/apache/incubator-airflow/blob/master/airflow/operators/dagrun_operator.py

进行更改的提交是: https://github.com/apache/incubator-airflow/commit/089c996fbd9ecb0014dbefedff232e8699ce6283#diff-41f9029188bd5e500dec9804fed26fb4