S. *_*eza 5 transform batch-processing amazon-web-services parquet amazon-sagemaker
我正在尝试使用镶木地板数据文件运行批量转换推理作业,但找不到任何内容。到处都说批量转换仅接受文本/csv 或 json 格式类型。出于测试目的,我确实尝试在 AWS 帐户内使用 lambda 函数来调用 parque 数据,但批量转换作业从未成功。出现 ClientError: 400,解析数据时出错。
request = \
{
"TransformJobName": batch_job_name,
"ModelName": model_name,
"BatchStrategy": "MultiRecord",
"TransformOutput": {
"S3OutputPath": batch_output
},
"TransformInput": {
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
"S3Uri": batch_input
}
},
"ContentType": "application/x-parquet",
"SplitType": "Line",
"CompressionType": "None"
},
"TransformResources": {
"InstanceType": "ml.m4.xlarge",
"InstanceCount": 1
}
}
client.create_transform_job(**request)
return "Done"
Run Code Online (Sandbox Code Playgroud)
目前,我正在尝试使用 parque 数据文件在本地运行 sagemaker 批量转换作业。我有可以在本地终端中运行以“服务”的 docker 映像,并且可以使用 REST API 服务 Postman 从“localhost:8080/incalls”使用“二进制”输入函数上传 parque 数据文件来调用数据。它工作正常,我可以看到邮递员体内填充的数据。但是,我无法使用 parque 数据进行批量转换。
有没有人成功使用 parquet 文件使用 sagemaker 批量转换进行转换和预测?
有点晚了,但希望这对其他人有帮助。只是补充一下 @Setu Shah 提到的内容,这就是我在 Sagemaker 中序列化和反序列化 parquet 文件的方法:
from io import BytesIO
from typing import BinaryIO
import pandas as pd
from botocore.response import StreamingBody
def input_fn(
serialized_input_data: StreamingBody,
content_type: str = "application/x-parquet",
) -> pd.DataFrame:
"""Deserialize inputs"""
if content_type == "application/x-parquet":
data = BytesIO(serialized_input_data)
df = pd.read_parquet(data)
return df
else:
raise ValueError(
"Expected `application/x-parquet`."
)
def output_fn(output: pd.DataFrame, accept: str = "application/x-parquet") -> BinaryIO:
"""Model output handler"""
if accept == "application/x-parquet":
buffer = BytesIO()
output.to_parquet(buffer)
return buffer.getvalue()
else:
raise Exception("Requested unsupported ContentType in Accept: " + accept)
Run Code Online (Sandbox Code Playgroud)
Sagemaker Batch Transform 似乎不支持 parquet 格式,因此您必须有自己的解决方法来处理 parquet 数据集。您可以将 Parquet 数据集转换为推理端点支持的数据集(例如 text/csv 或 application/json),并在批量转换中使用此转换后的数据集。在 Spark 集群中,您可以通过执行以下简单操作来完成此操作:
sqlContext.read.parquet("input/parquet/location/").write.json("output/json/location")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6313 次 |
| 最近记录: |