如何在FastAPI中上传文件,然后上传到Amazon S3并最终处理它?

Ari*_*iel 7 python file amazon-s3 boto3 fastapi

我有一个 FastAPI 端点,它接收文件,将其上传到 s3,然后对其进行处理。除了处理失败外,一切正常,并显示以下消息:

  File "/usr/local/lib/python3.9/site-packages/starlette/datastructures.py", line 441, in read
    return self.file.read(size)
  File "/usr/local/lib/python3.9/tempfile.py", line 735, in read
    return self._file.read(*args)
ValueError: I/O operation on closed file.
Run Code Online (Sandbox Code Playgroud)

我的简化代码如下所示:

async def process(file: UploadFile):
    reader = csv.reader(iterdecode(file.file.read(), "utf-8"), dialect="excel")  # This fails!
    datarows = []
    for row in reader:
        datarows.append(row)
    return datarows
Run Code Online (Sandbox Code Playgroud)

如何读取上传文件的内容?

更新

我设法进一步隔离问题。这是我的简化端点:

import boto3
from loguru import logger
from botocore.exceptions import ClientError


UPLOAD = True

@router.post("/")
async def upload(file: UploadFile = File(...)):
    if UPLOAD:
        # Upload the file
        s3_client = boto3.client("s3", endpoint_url="http://localstack:4566")
        try:
            s3_client.upload_fileobj(file.file, "local", "myfile.txt")
        except ClientError as e:
            logger.error(e)
    contents = await file.read()
    return JSONResponse({"message": "Success!"})
Run Code Online (Sandbox Code Playgroud)

如果UPLOAD是 True,我会收到错误。如果不是,一切正常。看来 boto3 正在上传文件后关闭该文件。有什么办法可以重新打开该文件吗?或发送一份副本至upload_fileobj

Chr*_*ris 15

FastAPI(实际上是 Starlette)UploadFile(也请参阅Starlette 的文档)使用 Python SpooledTemporaryFile,“存储在内存中的文件达到最大大小限制,超过此限制后,它将存储在磁盘。”。它“的操作方式与”完全一样TemporaryFile,“一旦关闭就会被销毁close(包括对象被垃圾收集时的隐式销毁)”。因此,似乎一旦file读取了的内容boto3,该文件就会关闭,进而导致该文件被删除。

\n

选项1

\n

如果服务器支持它,您可以使用 读取文件内容\xe2\x80\x94 ,如本答案contents = file.file.read()所示(或读取/写入请参阅此处)\xe2\x80\x94,然后将这些(即)直接上传到您的服务器。asynccontentsbytes

\n

否则,您可以再次读取contents文件的参考点,然后将其移动到文件的开头。在文件中,有一个内部“光标”(或“文件指针”),表示读取(或写入)文件内容的位置。调用时read()会一直读取到缓冲区末尾,在光标之外留下零字节。因此,还可以使用该seek()方法将光标的当前位置设置为0(即,将光标倒回到文件的开头);因此,允许您在读取文件内容传递file对象(即,upload_fileobj(file.file)请参阅此答案) 。

\n

根据FastAPI 的文档

\n
\n

seek(offset):转到offset (int)文件中的字节位置。

\n
    \n
  • 例如,await myfile.seek(0)将转到文件的开头。
  • \n
  • await myfile.read()如果您运行一次然后需要再次读取内容,这尤其有用。
  • \n
\n
\n

例子

\n
from fastapi import File, UploadFile, HTTPException\n\n@app.post(\'/\')\ndef upload(file: UploadFile = File(...)):\n    try:\n        contents = file.file.read()\n        file.file.seek(0)\n        # Upload the file to to your S3 service\n        s3_client.upload_fileobj(file.file, \'local\', \'myfile.txt\')\n    except Exception:\n        raise HTTPException(status_code=500, detail=\'Something went wrong\')\n    finally:\n        file.file.close()\n\n   print(contents)  # Handle file contents as desired\n   return {"filename": file.filename}\n
Run Code Online (Sandbox Code Playgroud)\n

选项2

\n

将文件的内容复制到NamedTemporaryFile,与 不同的是,它TemporaryFile“在文件系统中具有可见的名称”,“可用于打开文件”(该名称可以从属性 中检索.name)。此外,通过将参数设置为; ,它在关闭后仍然可以访问。因此,允许在需要时重新打开文件。完成后,您可以使用or方法将其删除。下面是一个工作示例(受此答案启发):deleteFalseos.remove()os.unlink()

\n
from fastapi import FastAPI, File, UploadFile, HTTPException\nfrom tempfile import NamedTemporaryFile\nimport os\n\napp = FastAPI()\n\n@app.post("/upload")\ndef upload_file(file: UploadFile = File(...)):\n    temp = NamedTemporaryFile(delete=False)\n    try:\n        try:\n            contents = file.file.read()\n            with temp as f:\n                f.write(contents);\n        except Exception:\n            raise HTTPException(status_code=500, detail=\'Error on uploading the file\')\n        finally:\n            file.file.close()\n            \n        # Upload the file to your S3 service using `temp.name`\n        s3_client.upload_file(temp.name, \'local\', \'myfile.txt\')\n        \n    except Exception:\n        raise HTTPException(status_code=500, detail=\'Something went wrong\')\n    finally:\n        #temp.close()  # the `with` statement above takes care of closing the file\n        os.remove(temp.name)  # Delete temp file\n    \n    print(contents)  # Handle file contents as desired\n    return {"filename": file.filename}\n
Run Code Online (Sandbox Code Playgroud)\n

选项3

\n

您甚至可以将字节保存在内存缓冲区BytesIO中,使用它将内容上传到 S3 存储桶,最后关闭它(“调用该方法时,缓冲区将被丢弃close()。”)。请记住seek(0)在完成对 BytesIO 流的写入后调用方法将光标重置回文件的开头。

\n
contents = file.file.read()\ntemp_file = io.BytesIO()\ntemp_file.write(contents)\ntemp_file.seek(0)\ns3_client.upload_fileobj(temp_file, "local", "myfile.txt")\ntemp_file.close()\n
Run Code Online (Sandbox Code Playgroud)\n


Bas*_*n B 1

来自 FastAPI导入文件

从 fastapi 导入文件和 UploadFile:

from fastapi import FastAPI, File, UploadFile

app = FastAPI()


@app.post("/files/")
async def create_file(file: bytes = File(...)):
    return {"file_size": len(file)}


@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile = File(...)):
    return {"filename": file.filename}
Run Code Online (Sandbox Code Playgroud)

来自 FastAPI UploadFile

例如,在异步路径操作函数内部,您可以通过以下方式获取内容:

contents = await myfile.read()
Run Code Online (Sandbox Code Playgroud)

用你的代码你应该有这样的东西:

async def process(file: UploadFile = File(...)):
    content = await file.read()
    reader = csv.reader(iterdecode(content, "utf-8"), dialect="excel")
    datarows = []
    for row in reader:
        datarows.append(row)
    return datarows
Run Code Online (Sandbox Code Playgroud)