要求:要求是下载 s3 文件并使用逻辑编辑该文件并将文件上传回相同的 s3 位置
逻辑:编辑文件的最后一行,我需要删除尾随分隔符和双引号,这是我能够使用 python 函数实现的
问题:如何将 python 脚本或 python 函数传递给操作员S3FileTransformOperator并更新文件?如果不可行S3FileTransformOperator,我怎样才能使用 boto3 实现下载、编辑和上传文件的功能
Python函数:
# read the file into a list of lines
lines = open(mys3file, 'r').readlines()
lines[-1] = lines[-1].rstrip()
fields = [field.replace('"', '') for field in lines[-1].split('|') if field]
lines[-1] = '|'.join(fields)
# now write the modified list back out to the file
open(mys3file, 'w').writelines(lines)
Run Code Online (Sandbox Code Playgroud)
代码:
import logging
import datetime
from datetime import timedelta
import pprint
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.s3_file_transform_operator import S3FileTransformOperator
from airflow.operators.python_operator import PythonOperator
# Variables
tmpl_search_path = '/usr/local/airflow/dags/sql'
pp = pprint.PrettyPrinter(indent=4)
default_args = {
'owner': 'business',
'start_date': datetime.datetime(2020, 6, 23),
'provide_context': True,
}
# Define the DAG
dag = DAG(
dag_id='dag_A',
schedule_interval='@daily',
template_searchpath=tmpl_search_path,
default_args=default_args,
catchup=False,
)
transform_file_over = S3FileTransformOperator(
task_id='transform_file_over',
source_s3_key='s3://mybucket/myfile',
dest_s3_key='s3://mybucket/myfile',
transform_script= #how to call the python function
)
transform_file_over
Run Code Online (Sandbox Code Playgroud)
小智 6
S3FileTransformOperator有一个选项transform_script,您可以传递要用于文件数据转换的python脚本的位置。
脚步:
请参阅此处了解详细示例:https ://github.com/jamesang17/airflow-app/tree/master/airflow
让我知道你的回应!
| 归档时间: |
|
| 查看次数: |
3465 次 |
| 最近记录: |