如何考虑同一 DAG 中先前任务的结果来创建动态数量的任务?

big*_*add 2 python dataframe airflow

我想创建动态数量的任务,同时考虑到在上一个任务中读取 CSV 时获得的块数量。除此之外,我想同时运行所有动态任务,并在完成后运行最终任务。所有这些都在同一个 DAG 内。

为了使“图像”更清晰,我创建了一个图表:

和代码:

#python imports
import awswrangler as wr
import boto3
from datetime import datetime
#airflow imports
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator

# First PythonOperator
def read_csv(s3_dir,client_s3,**kwargs):
    
    df_list = wr.s3.read_csv(s3_dir, sep='|', chunksize = 100,boto3_session = client_s3)
    
    intermediate_dir = Variable.get("s3_intermediate_dir")
    
    for idx, df in enumerate(df_list):
        wr.s3.to_csv(df,intermediate_dir + "intermediate_{}".format(idx))
    
    ti.xcom_push(key='number_of_chunks', value=len(df_list))

# * PythonOperator
def transform_dataframes(s3_dir,client_s3,**kwargs):
    #DO SOMETHING
    
# Final PythonOperator
def agg_df_one(s3_dir,client_s3,**kwargs):
    #list all files
    all_files = wr.s3.list_objects(s3_dir, boto3_session = client_s3)
    #agg_df_one
    df_list = [wr.s3.read_csv(file, sep='|', chunksize = 100,boto3_session = client_s3) for file in all_files]

    final_df = pd.concat(df_list, axis=0, ignore_index=True)
    now = datetime.now()
    dt_string = now.strftime("%d/%m/%Y-%H")
    
    #save final s3
    final_dir = Variable.get("s3_final_dir")
    wr.s3.to_csv(final_df,final_dir + dt_string + "/final_df.csv")    
    
    #delete all
    wr.s3.delete_objects(all_files,boto3_session = client_s3)
    
default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'wait_for_downstream': True,
    'retries': 0,
}

dag = DAG(
    dag_id='test_parallelism_dag',
    default_args=default_args,
    schedule_interval='@hourly',
    catchup=False,
    max_active_runs=3,
    oncurrency = 10,
)

client_s3 = boto3.session(aws_access_key_id  = Variable.get("s3_key_id"),aws_secret_access_key = Variable.get("s3_secret_key")) 

wr.s3.endpoint = Variable.get("s3_endpoint_url")

read_csv_task = PythonOperator(
    task_id='read_csv_func',
    python_callable=read_csv,
    provide_context=True,
    op_kwargs={'s3_dir':Variable.get("init_s3_dir",'client_s3':client_s3},
    dag=dag
)

transform_dataframes_task = [PythonOperator(
    task_id=f'test_parallelism_dag.transform_dataframe_chunk_{i}',
    python_callable=transform_dataframes,
    provide_context=True,
    dag=dag
) for i in "{{ ti.xcom_pull(key='number_of_chunks') }}"]


agg_df_one_task = PythonOperator(
    task_id='agg_df_one_func',
    python_callable=agg_df_one,
    provide_context=True,
    trigger_rule = 'none_failed',
    dag=dag
)

read_csv_task >> transform_dataframes_task >> agg_df_one_task

Run Code Online (Sandbox Code Playgroud)

我的问题是,如何将块列表的大小传递给任务“transform_dataframes_task”以创建所有需要的任务?

感谢您的想法!

Jar*_*iuk 5

这在今天是不可能的,但很快就会成为可能。

我们刚刚批准了 Apache Airflow 中的 AIP(气流改进提案),它将启用此模式 - https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-42%3A+Dynamic+Task+Mapping,它非常有用可能会在 Airflow 2.3 中实现。