相关疑难解决方法(0)

如何动态迭代上游任务的输出以在气流中创建并行任务?

请考虑以下DAG示例,其中第一个任务get_id_creds是从数据库中提取凭据列表.此操作告诉我数据库中的哪些用户可以运行进一步的数据预处理,并将这些ID写入文件/tmp/ids.txt.然后,我将这些ID扫描到我的DAG中,并使用它们生成upload_transaction可以并行运行的任务列表.

我的问题是:使用气流是否有更正确,更动态的方式来做到这一点?我在这里感觉笨拙和脆弱.如何直接将有效ID列表从一个进程传递到定义后续下游进程?

from datetime import datetime, timedelta
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    import dash_workers
else:
    print('Define DASH_PREPROC_PATH value in environmental variables')
    sys.exit(1)

default_args = {
  'start_date': datetime.now(),
  'schedule_interval': None
}

DAG = DAG(
  dag_id='dash_preproc',
  default_args=default_args
)

get_id_creds = PythonOperator(
    task_id='get_id_creds',
    python_callable=dash_workers.get_id_creds, 
    provide_context=True,
    dag=DAG)

with open('/tmp/ids.txt', 'r') as infile:
    ids = infile.read().splitlines()

for uid in uids:
    upload_transactions = PythonOperator( …
Run Code Online (Sandbox Code Playgroud)

python python-3.x airflow apache-airflow

5
推荐指数
2
解决办法
2387
查看次数

标签 统计

airflow ×1

apache-airflow ×1

python ×1

python-3.x ×1