Airflow:如何在 S3FileTransformOperator 中使用 python 函数或 python 脚本传递参数transform_script

Kar*_*Kar 1 boto3 airflow

要求:要求是下载 s3 文件并使用逻辑编辑该文件并将文件上传回相同的 s3 位置

逻辑:编辑文件的最后一行,我需要删除尾随分隔符和双引号,这是我能够使用 python 函数实现的

问题:如何将 python 脚本或 python 函数传递给操作员S3FileTransformOperator并更新文件?如果不可行S3FileTransformOperator,我怎样才能使用 boto3 实现下载、编辑和上传文件的功能

Python函数:

         # read the file into a list of lines
         lines = open(mys3file, 'r').readlines()
         lines[-1] = lines[-1].rstrip()
         fields = [field.replace('"', '') for field in lines[-1].split('|') if field]
         lines[-1] = '|'.join(fields)
         # now write the modified list back out to the file
         open(mys3file, 'w').writelines(lines)
Run Code Online (Sandbox Code Playgroud)

代码:

import logging
import datetime
from datetime import timedelta
import pprint

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.s3_file_transform_operator import S3FileTransformOperator
from airflow.operators.python_operator import PythonOperator

# Variables
tmpl_search_path = '/usr/local/airflow/dags/sql'
pp = pprint.PrettyPrinter(indent=4)

default_args = {
        'owner': 'business',
        'start_date': datetime.datetime(2020, 6, 23),
        'provide_context': True,
}

# Define the DAG
dag = DAG(
    dag_id='dag_A',
    schedule_interval='@daily',
    template_searchpath=tmpl_search_path,
    default_args=default_args,
    catchup=False,
)

transform_file_over = S3FileTransformOperator(
    task_id='transform_file_over',
    source_s3_key='s3://mybucket/myfile',
    dest_s3_key='s3://mybucket/myfile',
    transform_script= #how to call the python function
)

transform_file_over
Run Code Online (Sandbox Code Playgroud)

小智 6

S3FileTransformOperator有一个选项transform_script,您可以传递要用于文件数据转换的python脚本的位置。

脚步:

  1. 将您的逻辑放入 python 文件示例 /tmp/scripts/transform.py中,并将脚本文件复制到 dags 文件夹/airflow Worker 可以访问 python 脚本文件的位置。
  2. 添加python脚本文件的执行权限。chmod +x 变换.py
  3. 提供transform_script = "/tmp/scripts/transform.py" 作为运算符的参数。

请参阅此处了解详细示例:https ://github.com/jamesang17/airflow-app/tree/master/airflow

让我知道你的回应!