big*_*add 2 python dataframe airflow
我想创建动态数量的任务,同时考虑到在上一个任务中读取 CSV 时获得的块数量。除此之外,我想同时运行所有动态任务,并在完成后运行最终任务。所有这些都在同一个 DAG 内。
为了使“图像”更清晰,我创建了一个图表:
和代码:
#python imports
import awswrangler as wr
import boto3
from datetime import datetime
#airflow imports
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
# First PythonOperator
def read_csv(s3_dir,client_s3,**kwargs):
df_list = wr.s3.read_csv(s3_dir, sep='|', chunksize = 100,boto3_session = client_s3)
intermediate_dir = Variable.get("s3_intermediate_dir")
for idx, df in enumerate(df_list):
wr.s3.to_csv(df,intermediate_dir + "intermediate_{}".format(idx))
ti.xcom_push(key='number_of_chunks', value=len(df_list))
# * PythonOperator
def transform_dataframes(s3_dir,client_s3,**kwargs):
#DO SOMETHING
# Final PythonOperator
def agg_df_one(s3_dir,client_s3,**kwargs):
#list all files
all_files = wr.s3.list_objects(s3_dir, boto3_session = client_s3)
#agg_df_one
df_list = [wr.s3.read_csv(file, sep='|', chunksize = 100,boto3_session = client_s3) for file in all_files]
final_df = pd.concat(df_list, axis=0, ignore_index=True)
now = datetime.now()
dt_string = now.strftime("%d/%m/%Y-%H")
#save final s3
final_dir = Variable.get("s3_final_dir")
wr.s3.to_csv(final_df,final_dir + dt_string + "/final_df.csv")
#delete all
wr.s3.delete_objects(all_files,boto3_session = client_s3)
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'retries': 0,
}
dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
schedule_interval='@hourly',
catchup=False,
max_active_runs=3,
oncurrency = 10,
)
client_s3 = boto3.session(aws_access_key_id = Variable.get("s3_key_id"),aws_secret_access_key = Variable.get("s3_secret_key"))
wr.s3.endpoint = Variable.get("s3_endpoint_url")
read_csv_task = PythonOperator(
task_id='read_csv_func',
python_callable=read_csv,
provide_context=True,
op_kwargs={'s3_dir':Variable.get("init_s3_dir",'client_s3':client_s3},
dag=dag
)
transform_dataframes_task = [PythonOperator(
task_id=f'test_parallelism_dag.transform_dataframe_chunk_{i}',
python_callable=transform_dataframes,
provide_context=True,
dag=dag
) for i in "{{ ti.xcom_pull(key='number_of_chunks') }}"]
agg_df_one_task = PythonOperator(
task_id='agg_df_one_func',
python_callable=agg_df_one,
provide_context=True,
trigger_rule = 'none_failed',
dag=dag
)
read_csv_task >> transform_dataframes_task >> agg_df_one_task
Run Code Online (Sandbox Code Playgroud)
我的问题是,如何将块列表的大小传递给任务“transform_dataframes_task”以创建所有需要的任务?
感谢您的想法!
这在今天是不可能的,但很快就会成为可能。
我们刚刚批准了 Apache Airflow 中的 AIP(气流改进提案),它将启用此模式 - https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-42%3A+Dynamic+Task+Mapping,它非常有用可能会在 Airflow 2.3 中实现。
| 归档时间: |
|
| 查看次数: |
803 次 |
| 最近记录: |