dev*_*v ツ 6 airflow airflow-scheduler
我正在从外部文件读取元素列表并循环遍历元素以创建一系列任务。
例如,如果文件中有 2 个元素 - [A, B]。将有2个系列的任务:
A1 -> A2 ..
B1 -> B2 ...
Run Code Online (Sandbox Code Playgroud)
这种读取元素逻辑不是任何任务的一部分,而是 DAG 本身的一部分。因此,调度程序每天在读取 DAG 文件时会多次调用它。我只想在 DAG 运行时调用它。
想知道是否已经有这种用例的模式?
Nic*_*coE 17
根据您的要求,如果您正在寻找的是避免多次读取文件,但您不介意从元数据数据库中读取多次,那么您可以将用作迭代源的方法更改Variables为动态创建任务。
一个基本的示例可以是在 a 内执行文件读取PythonOperator并设置Variables稍后将用于迭代的 (相同的可调用):
样本文件.json:
{
"cities": [ "London", "Paris", "BA", "NY" ]
}
Run Code Online (Sandbox Code Playgroud)
任务定义:
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
import json
def _read_file():
with open('dags/sample_file.json') as f:
data = json.load(f)
Variable.set(key='list_of_cities',
value=data['cities'], serialize_json=True)
print('Loading Variable from file...')
def _say_hello(city_name):
print('hello from ' + city_name)
with DAG('dynamic_tasks_from_var', schedule_interval='@once',
start_date=days_ago(2),
catchup=False) as dag:
read_file = PythonOperator(
task_id='read_file',
python_callable=_read_file
)
Run Code Online (Sandbox Code Playgroud)
然后您可以读取该变量并创建动态任务。(设置一个很重要default_var)。是TaskGroup可选的。
# Top-level code
updated_list = Variable.get('list_of_cities',
default_var=['default_city'],
deserialize_json=True)
print(f'Updated LIST: {updated_list}')
with TaskGroup('dynamic_tasks_group',
prefix_group_id=False,
) as dynamic_tasks_group:
for index, city in enumerate(updated_list):
say_hello = PythonOperator(
task_id=f'say_hello_from_{city}',
python_callable=_say_hello,
op_kwargs={'city_name': city}
)
# DAG level dependencies
read_file >> dynamic_tasks_group
Run Code Online (Sandbox Code Playgroud)
在调度程序日志中,您只会找到:
INFO - Updated LIST: ['London', 'Paris', 'BA', 'NY']
Run Code Online (Sandbox Code Playgroud)
达格图视图:
通过这种方法,顶层代码(因此由调度程序连续读取)是对方法的调用Variable.get()。如果您需要读取多个变量,请务必记住,建议将它们存储在一个 JSON 值中,以避免不断创建与元数据数据库的连接(本文中的示例)。
| 归档时间: |
|
| 查看次数: |
272 次 |
| 最近记录: |