Ari*_*iel 7 python file amazon-s3 boto3 fastapi
我有一个 FastAPI 端点,它接收文件,将其上传到 s3,然后对其进行处理。除了处理失败外,一切正常,并显示以下消息:
File "/usr/local/lib/python3.9/site-packages/starlette/datastructures.py", line 441, in read
return self.file.read(size)
File "/usr/local/lib/python3.9/tempfile.py", line 735, in read
return self._file.read(*args)
ValueError: I/O operation on closed file.
Run Code Online (Sandbox Code Playgroud)
我的简化代码如下所示:
async def process(file: UploadFile):
reader = csv.reader(iterdecode(file.file.read(), "utf-8"), dialect="excel") # This fails!
datarows = []
for row in reader:
datarows.append(row)
return datarows
Run Code Online (Sandbox Code Playgroud)
如何读取上传文件的内容?
更新
我设法进一步隔离问题。这是我的简化端点:
import boto3
from loguru import logger
from botocore.exceptions import ClientError
UPLOAD = True
@router.post("/")
async def upload(file: UploadFile = File(...)):
if UPLOAD:
# Upload the file
s3_client = boto3.client("s3", endpoint_url="http://localstack:4566")
try:
s3_client.upload_fileobj(file.file, "local", "myfile.txt")
except ClientError as e:
logger.error(e)
contents = await file.read()
return JSONResponse({"message": "Success!"})
Run Code Online (Sandbox Code Playgroud)
如果UPLOAD是 True,我会收到错误。如果不是,一切正常。看来 boto3 正在上传文件后关闭该文件。有什么办法可以重新打开该文件吗?或发送一份副本至upload_fileobj?
Chr*_*ris 15
FastAPI(实际上是 Starlette)UploadFile(也请参阅Starlette 的文档)使用 Python SpooledTemporaryFile,“存储在内存中的文件达到最大大小限制,超过此限制后,它将存储在磁盘。”。它“的操作方式与”完全一样TemporaryFile,“一旦关闭就会被销毁close(包括对象被垃圾收集时的隐式销毁)”。因此,似乎一旦file读取了的内容boto3,该文件就会关闭,进而导致该文件被删除。
如果服务器支持它,您可以使用 读取文件内容\xe2\x80\x94 ,如本答案contents = file.file.read()所示(或读取/写入请参阅此处)\xe2\x80\x94,然后将这些(即)直接上传到您的服务器。asynccontentsbytes
否则,您可以再次读取contents文件的参考点,然后将其移动到文件的开头。在文件中,有一个内部“光标”(或“文件指针”),表示读取(或写入)文件内容的位置。调用时read()会一直读取到缓冲区末尾,在光标之外留下零字节。因此,还可以使用该seek()方法将光标的当前位置设置为0(即,将光标倒回到文件的开头);因此,允许您在读取文件内容后传递file对象(即,upload_fileobj(file.file)请参阅此答案) 。
根据FastAPI 的文档:
\n\n\n\n
seek(offset):转到offset (int)文件中的字节位置。\n
\n- 例如,
\nawait myfile.seek(0)将转到文件的开头。- \n
await myfile.read()如果您运行一次然后需要再次读取内容,这尤其有用。
from fastapi import File, UploadFile, HTTPException\n\n@app.post(\'/\')\ndef upload(file: UploadFile = File(...)):\n try:\n contents = file.file.read()\n file.file.seek(0)\n # Upload the file to to your S3 service\n s3_client.upload_fileobj(file.file, \'local\', \'myfile.txt\')\n except Exception:\n raise HTTPException(status_code=500, detail=\'Something went wrong\')\n finally:\n file.file.close()\n\n print(contents) # Handle file contents as desired\n return {"filename": file.filename}\nRun Code Online (Sandbox Code Playgroud)\n将文件的内容复制到NamedTemporaryFile,与 不同的是,它TemporaryFile“在文件系统中具有可见的名称”,“可用于打开文件”(该名称可以从属性 中检索.name)。此外,通过将参数设置为; ,它在关闭后仍然可以访问。因此,允许在需要时重新打开文件。完成后,您可以使用or方法将其删除。下面是一个工作示例(受此答案启发):deleteFalseos.remove()os.unlink()
from fastapi import FastAPI, File, UploadFile, HTTPException\nfrom tempfile import NamedTemporaryFile\nimport os\n\napp = FastAPI()\n\n@app.post("/upload")\ndef upload_file(file: UploadFile = File(...)):\n temp = NamedTemporaryFile(delete=False)\n try:\n try:\n contents = file.file.read()\n with temp as f:\n f.write(contents);\n except Exception:\n raise HTTPException(status_code=500, detail=\'Error on uploading the file\')\n finally:\n file.file.close()\n \n # Upload the file to your S3 service using `temp.name`\n s3_client.upload_file(temp.name, \'local\', \'myfile.txt\')\n \n except Exception:\n raise HTTPException(status_code=500, detail=\'Something went wrong\')\n finally:\n #temp.close() # the `with` statement above takes care of closing the file\n os.remove(temp.name) # Delete temp file\n \n print(contents) # Handle file contents as desired\n return {"filename": file.filename}\nRun Code Online (Sandbox Code Playgroud)\n您甚至可以将字节保存在内存缓冲区BytesIO中,使用它将内容上传到 S3 存储桶,最后关闭它(“调用该方法时,缓冲区将被丢弃close()。”)。请记住seek(0)在完成对 BytesIO 流的写入后调用方法将光标重置回文件的开头。
contents = file.file.read()\ntemp_file = io.BytesIO()\ntemp_file.write(contents)\ntemp_file.seek(0)\ns3_client.upload_fileobj(temp_file, "local", "myfile.txt")\ntemp_file.close()\nRun Code Online (Sandbox Code Playgroud)\n
来自 FastAPI导入文件:
从 fastapi 导入文件和 UploadFile:
from fastapi import FastAPI, File, UploadFile
app = FastAPI()
@app.post("/files/")
async def create_file(file: bytes = File(...)):
return {"file_size": len(file)}
@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile = File(...)):
return {"filename": file.filename}
Run Code Online (Sandbox Code Playgroud)
来自 FastAPI UploadFile:
例如,在异步路径操作函数内部,您可以通过以下方式获取内容:
contents = await myfile.read()
Run Code Online (Sandbox Code Playgroud)
用你的代码你应该有这样的东西:
async def process(file: UploadFile = File(...)):
content = await file.read()
reader = csv.reader(iterdecode(content, "utf-8"), dialect="excel")
datarows = []
for row in reader:
datarows.append(row)
return datarows
Run Code Online (Sandbox Code Playgroud)