气流:模式运行气流子标记一次

gni*_*las 11 python etl directed-acyclic-graphs airflow apache-airflow

从气流文档:

SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is set to None or @once, the SubDAG will succeed without having done anything
Run Code Online (Sandbox Code Playgroud)

我知道subagoperator实际上是作为BackfillJob实现的,因此我们必须向schedule_interval运营商提供.但是,有没有办法获得子schedule_interval="@once"标记的语义等价物?我担心如果我使用set schedule_interval="@daily"为子标记,如果子标记运行时间超过一天,则子标记可能会运行多次.

def subdag_factory(parent_dag_name, child_dag_name, args):
    subdag = DAG(
        dag_id="{parent_dag_name}.{child_dag_name}".format(
            parent_dag_name=parent_dag_name, child_dag_name=child_dag_name
        ),
        schedule_interval="@daily", # <--- this bit here
        default_args=args
    )

    ... do more stuff to the subdag here
    return subdag
Run Code Online (Sandbox Code Playgroud)

TLDR:如何伪造"每次触发父dag只运行一次这个子标记"

apa*_*man 5

我觉得这schedule=@once对我的子公司来说 效果很好.也许我的版本是过时的,但我有更多的问题与我subdags 失败,即使所有任务成功(或跳过的)不是相反.

现在我的机器上运行的实际示例代码非常愉快:

subdag_name = ".".join((parent_name,child_name))
logging.info(parent_name)
logging.info(subdag_name)
dag_subdag = DAG(
    dag_id=subdag_name,
    default_args=dargs,
    schedule_interval="@once",
)
Run Code Online (Sandbox Code Playgroud)

事实上,我最初构建了几乎所有的dag作为我的子标记的美化cfg文件.不确定经过一些试验和错误后的想法有多好,但是计划间隔对我来说绝不是一个阻碍者.

我正在运行一个相对较新的1.8版本,几乎没有自定义.我一直在遵循dag的建议,将我的子标记保存在dags文件夹中的文件夹中,这样它们就不会出现在DagBag中.


Pri*_*hta 3

尝试使用子DAG 的schedule=None的外部触发模式。在这种情况下,只有当父 dag 触发时才会运行

  • 为了澄清起见,您建议使用 [TriggerDagRunOperator](https://airflow.incubator.apache.org/code.html?highlight=trigger%20dagrun#airflow.operators.TriggerDagRunOperator) 来触发没有时间表的 dag?subdag 的关键是我们想要*阻塞*语义,触发 dagrun 运算符只是触发 dagrun,然后继续前进,而不会等到 dagrun 完成。此外,您无法在运行 subdag 的气流 UI 中获得透明度,您只知道触发了一些随机 dagrun。 (3认同)