如何动态迭代上游任务的输出以在气流中创建并行任务?

Aar*_*ron 5 python python-3.x airflow apache-airflow

请考虑以下DAG示例,其中第一个任务get_id_creds是从数据库中提取凭据列表.此操作告诉我数据库中的哪些用户可以运行进一步的数据预处理,并将这些ID写入文件/tmp/ids.txt.然后,我将这些ID扫描到我的DAG中,并使用它们生成upload_transaction可以并行运行的任务列表.

我的问题是:使用气流是否有更正确,更动态的方式来做到这一点?我在这里感觉笨拙和脆弱.如何直接将有效ID列表从一个进程传递到定义后续下游进程?

from datetime import datetime, timedelta
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    import dash_workers
else:
    print('Define DASH_PREPROC_PATH value in environmental variables')
    sys.exit(1)

default_args = {
  'start_date': datetime.now(),
  'schedule_interval': None
}

DAG = DAG(
  dag_id='dash_preproc',
  default_args=default_args
)

get_id_creds = PythonOperator(
    task_id='get_id_creds',
    python_callable=dash_workers.get_id_creds, 
    provide_context=True,
    dag=DAG)

with open('/tmp/ids.txt', 'r') as infile:
    ids = infile.read().splitlines()

for uid in uids:
    upload_transactions = PythonOperator(
        task_id=uid,
        python_callable=dash_workers.upload_transactions,
        op_args=[uid],
        dag=DAG)
    upload_transactions.set_downstream(get_id_creds)
Run Code Online (Sandbox Code Playgroud)

Aar*_*ron 5

根据@Juan Riza的建议,我检查了以下链接:在Airflow中创建动态工作流的正确方法。尽管我能够简化解决方案,以至于我认为自己可以在此处提供自己的实现的修改版本,但这几乎是答案:

from datetime import datetime
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    import dash_workers
else:
    print('Define DASH_PREPROC_PATH value in environmental variables')
    sys.exit(1)

ENV = os.environ

default_args = {
  # 'start_date': datetime.now(),
  'start_date': datetime(2017, 7, 18)
}

DAG = DAG(
  dag_id='dash_preproc',
  default_args=default_args
)

clear_tables = PythonOperator(
  task_id='clear_tables',
  python_callable=dash_workers.clear_db,
  dag=DAG)

def id_worker(uid):
    return PythonOperator(
        task_id=uid,
        python_callable=dash_workers.main_preprocess,
        op_args=[uid],
        dag=DAG)

for uid in capone_dash_workers.get_id_creds():
    clear_tables >> id_worker(uid)
Run Code Online (Sandbox Code Playgroud)

clear_tables清理将由于该过程而重建的数据库。id_worker是一项函数,该函数根据从返回的ID值数组动态生成新的预处理任务get_if_creds。任务ID只是相应的用户ID,尽管它很容易成为索引i,如上面提到的示例所示。

注意该位移位运算符(<<)在我看来是后向的,因为该clear_tables任务应该排在第一位,但是在这种情况下它似乎可以正常工作。


sdi*_*kby 3

考虑到 Apache Airflow 是一个工作流管理工具,即。它确定用户定义的任务之间的依赖关系(作为示例)与 apache Nifi(数据流管理工具),即。这里的依赖关系是通过任务传输的数据。

也就是说,我认为你的方法是正确的(我的评论基于发布的代码),Airflow 提供了一个称为XCom. 它允许任务通过传递一些数据在它们之间进行“交叉通信”。传递的数据应该有多大?由你来测试!但一般来说应该不会那么大。我认为它是以键、值对的形式存储在气流元数据库中,即你不能传递文件,但带有 ids 的列表可以工作。

就像我说的,你应该自己测试一下。我很高兴了解您的经历。是一个示例 dag,它演示了 的用法XCom这里是必要的文档。干杯!

  • 感谢您的反馈,@sdikby。昨天我花了一些时间研究“Xcom”,感觉我对它如何在任务之间交换数据有很强的概念掌握。我没有看到这种情况适用于这种情况,因为我正在寻找一种基于同一 DAG 中上游任务的输出创建任意数量任务的方法。我突然想到可以创建两个 DAG,一个用于定义任务参数,另一个用于执行,但这并不理想,因为我失去了依赖关系。您是否同意“Xcom”不支持此应用程序,或者我错过了什么? (2认同)