Sim*_*ian 4 amazon-s3 amazon-web-services aws-glue
我对 AWS Glue 非常陌生,我想使用 AWS Glue 解压缩 S3 存储桶中存在的巨大文件,并将内容写回 S3。
我在尝试用谷歌搜索这个要求时找不到任何东西。
我的问题是:
我正在使用 AWS Glue Studio。任何帮助将不胜感激。
小智 5
如果您仍在寻找解决方案。您可以使用boto3
Pythonzipfile
库通过 AWS Glue 作业解压缩文件并将其写回。
需要考虑的一件事是您要处理的拉链的大小。我将以下脚本与6GB(压缩)和 30GB(解压)文件一起使用,效果很好。但如果文件太大而工作人员无法缓冲,则可能会失败。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
import boto3
import io
from zipfile import ZipFile
s3 = boto3.client("s3")
bucket = "wayfair-datasource" # your s3 bucket name
prefix = "files/location/" # the prefix for the objects that you want to unzip
unzip_prefix = "files/unzipped_location/" # the location where you want to store your unzipped files
# Get a list of all the resources in the specified prefix
objects = s3.list_objects(
Bucket=bucket,
Prefix=prefix
)["Contents"]
# The following will get the unzipped files so the job doesn't try to unzip a file that is already unzipped on every run
unzipped_objects = s3.list_objects(
Bucket=bucket,
Prefix=unzip_prefix
)["Contents"]
# Get a list containing the keys of the objects to unzip
object_keys = [ o["Key"] for o in objects if o["Key"].endswith(".zip") ]
# Get the keys for the unzipped objects
unzipped_object_keys = [ o["Key"] for o in unzipped_objects ]
for key in object_keys:
obj = s3.get_object(
Bucket="wayfair-datasource",
Key=key
)
objbuffer = io.BytesIO(obj["Body"].read())
# using context manager so you don't have to worry about manually closing the file
with ZipFile(objbuffer) as zip:
filenames = zip.namelist()
# iterate over every file inside the zip
for filename in filenames:
with zip.open(filename) as file:
filepath = unzip_prefix + filename
if filepath not in unzipped_object_keys:
s3.upload_fileobj(file, bucket, filepath)
job.commit()
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
4611 次 |
最近记录: |