Alp*_*aCR 6 python directed-acyclic-graphs airflow airflow-scheduler
我想暂停空闲且冗余的 DAG,我如何知道哪些 DAG 未暂停、哪些 DAG 已暂停?
因此,我有一个 DAG 列表,需要使用执行 .bashrc 的 bash 命令来取消暂停airflow pause <dag_id>。我想通过检查每个 DAG 的状态来了解命令是否成功pause。我检查过airflow webserver,似乎所有暂停的 DAG 仍在运行。
def pause_idle_dags(dags = ["myTutorial"]):
"""
Pauses dags from the airflow
:param dags: dags considered to be idle
:return: Success state
"""
# TODO
for dag in dags:
command = "airflow pause {}".format(dag)
print(executeBashCommand(command))
def executeBashCommand(command):
print('========RUN========', command)
p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode != 0:
print('========STDOUT========\n',stdout.decode())
print('========STDERR========\n',stderr.decode())
raise Exception('There is error while executing bash command: '+command+'\nwith log:\n'+stderr.decode())
return stdout, stderr
Run Code Online (Sandbox Code Playgroud)
当您运行指示它执行某些操作的气流命令时,它应该编辑它所连接的后端数据库的内部统计信息。默认情况下气流使用 SQLite。您可能已经设置了自己的。无论哪种方式,您都可以查询以进行检查。
例如airflow=# select * from dag where is_paused;
在这里您显然还可以执行更新,例如
airflow=# update dag set is_paused='false' where is_paused;
| 归档时间: |
|
| 查看次数: |
4176 次 |
| 最近记录: |