使用DB动态生成气流任务

Muk*_*ain 2 directed-acyclic-graphs airflow airflow-scheduler

我想像这样运行气流 dag ->

  • 我有 2 个气流工人 W1 和 W2。
  • 在 W1 中,我安排了一个任务 (W1-1),但在 W2 中,我想创建 X 个任务(W2-1、W2-2 ... W2-X)。
  • 每个任务的编号 X 和 bash 命令将来自数据库调用。
  • 工人 W2 的所有任务应在 W1 完成后并行运行。

这是我的代码

dag = DAG('deploy_single', catchup=False, default_args=default_args, schedule_interval='16 15 * * *')

t1 = BashOperator(
        task_id='dummy_task',
        bash_command='echo hi > /tmp/hi',
        queue='W1_queue',
        dag=dag)

get_all_engines = "select full_command, queue_name from internal_airflow_hosts where logical_group = 'live_engines';"

db_creds = json.loads(open('/opt/airflow/db_creds.json').read())
conn_dict = db_creds["airflowdb_local"]
connection = psycopg2.connect(**conn_dict)

cursor = connection.cursor()

cursor.execute(get_all_engines)
records = cursor.fetchall()
i = 1
for record in records:
    t = BashOperator(
        task_id='script_test_'+str(i),
        bash_command="{full_command} ".format(full_command=str(record[0])),
        queue=str(record[1]),
        dag=dag)
    t.set_upstream(t1)
    i += 1

cursor.close()
connection.close()
Run Code Online (Sandbox Code Playgroud)

但是,当我运行它时,W1 上的任务成功完成,但 W2 上的所有任务都失败了。在气流 UI 中,我可以看到它可以解决正确数量的任务(在本例中为 10),但是这 10 个任务中的每一个都失败了。

查看日志,我看到在 W2(在另一台机器上)上,气流找不到该db_creds.json文件。

我不想向 W2 提供 DB creds 文件。

我的问题是在这种情况下如何动态创建气流任务?基本上我想在气流服务器上运行一个数据库查询,并根据该查询的结果将任务分配给一个或多个工作人员。数据库将包含有关哪些引擎处于活动状态等的更新信息,我希望 DAG 反映这一点。从日志来看,每个工作人员似乎都在运行数据库查询。为每个工作人员提供对数据库的访问不是一种选择。

Muk*_*ain 5

谢谢@viraj-parekh 和@cwurtz。

经过多次反复试验,找到了在这种情况下使用气流变量的正确方法。

步骤 1) 我们创建另一个名为的脚本gen_var.py并将其放在 dag 文件夹中。这样,调度程序将选取并生成变量。如果生成变量的代码在deploy_singledag 中,那么我们会遇到相同的依赖问题,因为工作人员也会尝试处理 dag。

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
import json
import psycopg2
from airflow.models import Variable
from psycopg2.extensions import AsIs

get_all_engines = "select full_command, queue_name from internal_airflow_hosts where logical_group = 'live_engines';"

db_creds = json.loads(open('/opt/airflow/db_creds.json').read())
conn_dict = db_creds["airflowdb_local"]
connection = psycopg2.connect(**conn_dict)

cursor = connection.cursor()

cursor.execute(get_all_engines)
records = cursor.fetchall()

hosts = {}
i = 1
for record in records:
    comm_dict = {}
    comm_dict['full_command'] = str(record[0])
    comm_dict['queue_name'] = str(record[1])
    hosts[i] = comm_dict
    i += 1

cursor.close()
connection.close()

Variable.set("hosts",hosts,serialize_json=True)
Run Code Online (Sandbox Code Playgroud)

请注意对 的调用serialize_json。Airflow 会尝试将变量存储为字符串。如果您希望将其存储为 dict,请使用serialize_json=True. Airflow 仍将通过以下方式将其存储为字符串json.dumps

第 2 步)简化 dag 并调用此"hosts"变量(现在反序列化以取回 dict),如下所示 -

hoztz = Variable.get("hosts",deserialize_json=True)
for key in hoztz:
    host = hoztz.get(key)
    t = BashOperator(
        task_id='script_test_'+str(key),
        bash_command="{full_command} ".format(full_command=str(host.get('full_command'))),
        queue=str(host.get('queue_name')),
        dag=dag)
    t.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)

希望它可以帮助别人。