如何查明 DAG 在 Airflow 中是否已暂停/取消暂停?

Alp*_*aCR 6 python directed-acyclic-graphs airflow airflow-scheduler

我想暂停空闲且冗余的 DAG,我如何知道哪些 DAG 未暂停、哪些 DAG 已暂停?

因此,我有一个 DAG 列表,需要使用执行 .bashrc 的 bash 命令来取消暂停airflow pause <dag_id>。我想通过检查每个 DAG 的状态来了解命令是否成功pause。我检查过airflow webserver,似乎所有暂停的 DAG 仍在运行。

def pause_idle_dags(dags = ["myTutorial"]):
    """
    Pauses dags from the airflow
    :param dags: dags considered to be idle
    :return: Success state
    """
    # TODO
    for dag in dags:
        command = "airflow pause {}".format(dag)
        print(executeBashCommand(command))

def executeBashCommand(command):
    print('========RUN========', command)
    p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    stdout, stderr = p.communicate()
    if p.returncode != 0:
        print('========STDOUT========\n',stdout.decode())
        print('========STDERR========\n',stderr.decode())
        raise Exception('There is error while executing bash command: '+command+'\nwith log:\n'+stderr.decode())
    return stdout, stderr
Run Code Online (Sandbox Code Playgroud)

shw*_*ill 5

当您运行指示它执行某些操作的气流命令时,它应该编辑它所连接的后端数据库的内部统计信息。默认情况下气流使用 SQLite。您可能已经设置了自己的。无论哪种方式,您都可以查询以进行检查。

例如airflow=# select * from dag where is_paused;

在这里您显然还可以执行更新,例如

airflow=# update dag set is_paused='false' where is_paused;