Yuq*_*ang 11 python file-upload starlette fastapi python-requests-toolbelt
我正在尝试将一个大文件 (\xe2\x89\xa53GB) 上传到我的 FastAPI 服务器,而不将整个文件加载到内存中,因为我的服务器只有 2GB 可用内存。
\n服务器端:
\nasync def uploadfiles(upload_file: UploadFile = File(...):\n
Run Code Online (Sandbox Code Playgroud)\n客户端:
\nm = MultipartEncoder(fields = {"upload_file":open(file_name,\'rb\')})\nprefix = "http://xxx:5000"\nurl = "{}/v1/uploadfiles".format(prefix)\ntry:\n req = requests.post(\n url,\n data=m,\n verify=False,\n )\n
Run Code Online (Sandbox Code Playgroud)\n返回:
\nasync def uploadfiles(upload_file: UploadFile = File(...):\n
Run Code Online (Sandbox Code Playgroud)\n我不确定MultipartEncoder
实际发送到服务器的内容,因此请求不匹配。有任何想法吗?
Chr*_*ris 33
对于库,在声明for时,requests-toolbelt
您还必须传递,并设置标头\xe2\x80\x94 这是出现错误的主要原因,因为您发送请求时没有将标头设置为,后跟必要的字符串\xe2\x80\x94,如文档中所示。例子:filename
field
upload_file
Content-Type
Content-Type
multipart/form-data
boundary
filename = \'my_file.txt\'\nm = MultipartEncoder(fields={\'upload_file\': (filename, open(filename, \'rb\'))})\nr = requests.post(url, data=m, headers={\'Content-Type\': m.content_type})\nprint(r.request.headers) # confirm that the \'Content-Type\' header has been set\n
Run Code Online (Sandbox Code Playgroud)\n但是,我不建议使用requests-toolbelt
三年多没有提供新版本的库(即 )。我建议改用Python请求,如这个答案和那个答案所示(另请参阅流式上传和块编码请求),或者最好使用HTTPX
支持async
请求的库(如果您必须同时发送多个请求),以及File
默认情况下的流式上传,这意味着一次只会将一个块加载到内存中(请参阅文档)。下面给出了示例。
File
和Form
使用数据.stream()
正如之前在这个答案中详细解释的那样,当您声明一个UploadFile
对象时,FastAPI/Starlette 在底层使用SpooledTemporaryFile
属性max_size
设置为1MB的 ,这意味着文件数据将在内存中假脱机存储,直到文件大小超过max_size
,此时点内容写入磁盘;更具体地说,到temporary
操作系统临时目录\xe2\x80\x94上的文件,请参阅此答案,了解如何查找/更改默认临时目录\xe2\x80\x94,您稍后需要使用以下命令从中读取数据.read()
方法。因此,整个过程使得上传文件变得非常慢;特别是,如果它是一个大文件(正如您稍后将在下面的选项 2 中看到的那样)。
为了避免这种情况并加快进程,正如上面链接的答案所建议的那样,可以将正文request
作为流访问。根据Starlette 文档,如果您使用该.stream()
方法,则提供(请求)字节块,而不将整个主体存储到内存(如果主体大小超过 1MB,则稍后存储到临时文件)。此方法允许您在字节块到达时读取和处理它们。下面通过使用该库进一步提出了建议的解决方案,streaming-form-data
该库提供了一个用于解析流multipart/form-data
输入块的 Python 解析器。这意味着您不仅可以Form
随 一起上传数据File(s)
,而且不必等待接收到整个请求正文才能开始解析数据。完成的方式是初始化主解析器类(传递headers
有助于确定输入的HTTP 请求Content-Type
,从而确定boundary
用于分隔多部分有效负载中的每个正文部分等的字符串),并将一个关联类的Target
定义,用于定义从请求正文中提取字段后应如何处理该字段。例如,FileTarget
将数据流式传输到磁盘上的文件,而将数据保存在内存中(如果您不需要将文件保存到磁盘,则ValueTarget
此类也可用于 或Form
数据) File
。也可以定义您自己的自定义Target
类。我必须提到的是,该streaming-form-data
库当前不支持async
对 I/O 操作的调用,这意味着块的写入会sync
定期发生(在def
函数内)。不过,正如下面的端点所使用的.stream()
那样(这是一个async
函数),它将放弃对在事件循环上运行的其他任务/请求的控制,同时等待数据从流中变得可用。await
您还可以使用 Starlette 的 \ run_in_threadpool()
xe2\x80\x94 例如, \xe2\x80\x94在单独的线程中运行解析接收到的数据的函数,当您调用以下方法await run_in_threadpool(parser.data_received, chunk)
时,FastAPI 在内部使用它,如此处所示。有关vs的更多详细信息,请查看此答案。async
UploadFile
def
async def
您还可以执行某些验证任务,例如,确保输入大小不超过特定值。这可以使用 来完成MaxSizeValidator
。但是,由于这只会应用于您定义的字段\xe2\x80\x94,因此,它不会阻止恶意用户发送极大的请求正文,这可能会导致应用程序消耗服务器资源可能最终会崩溃\xe2\x80\x94下面包含一个自定义MaxBodySizeValidator
类,用于确保请求正文大小不超过预定义值。上面描述的两个验证器都以一种可能比这里描述的更好的方式解决了限制上传文件(以及整个请求正文)大小的问题,它使用UploadFile
,因此需要完全接收文件并将其保存到临时目录,在执行检查之前(更不用说该方法根本不考虑请求主体大小)\xe2\x80\x94,使用 ASGI 中间件,例如这将是限制请求主体的替代解决方案。此外,如果您将Gunicorn 与 Uvicorn 一起使用,您还可以定义限制,例如请求中 HTTP 标头字段的数量、HTTP 请求标头字段的大小等(请参阅文档) 。使用反向代理服务器时可以应用类似的限制,例如 Nginx(它还允许您使用client_max_body_size
)。
以下示例的一些注释。由于它Request
直接使用对象,而不是UploadFile
和Form
对象,因此端点不会在自动生成的文档中正确记录/docs
(如果这对您的应用程序很重要)。这也意味着您必须自己执行一些检查,例如是否收到端点的必填字段,以及它们是否采用预期格式。例如,对于该data
字段,您可以检查是否data.value
为空(空意味着用户未在 中包含该字段multipart/form-data
,或者发送了空值),以及 if isinstance(data.value, str)
。对于文件,可以检查是否file_.multipart_filename
不为空;但是,由于某些用户filename
可能不会将 a 包含在 a 中Content-Disposition
,因此您可能还想检查文件系统中是否存在该文件,使用os.path.isfile(filepath)
(注意:您需要确保在指定的位置;否则,上述函数将始终返回True
,即使用户没有发送文件)。
关于应用的大小限制,MAX_REQUEST_BODY_SIZE
以下内容必须大于您期望收到的MAX_FILE_SIZE
(加上所有值的大小),因为原始请求正文(您从使用该方法获得的)包含更多字节和每个的标头体内的场。因此,您应该添加更多字节,具体取决于您期望接收的值和文件数量(因此如下)。Form
.stream()
--boundary
Content-Disposition
Form
MAX_FILE_SIZE + 1024
应用程序.py
\nfrom fastapi import FastAPI, Request, HTTPException, status\nfrom streaming_form_data import StreamingFormDataParser\nfrom streaming_form_data.targets import FileTarget, ValueTarget\nfrom streaming_form_data.validators import MaxSizeValidator\nimport streaming_form_data\nfrom starlette.requests import ClientDisconnect\nimport os\n\nMAX_FILE_SIZE = 1024 * 1024 * 1024 * 4 # = 4GB\nMAX_REQUEST_BODY_SIZE = MAX_FILE_SIZE + 1024\n\napp = FastAPI()\n\nclass MaxBodySizeException(Exception):\n def __init__(self, body_len: str):\n self.body_len = body_len\n\nclass MaxBodySizeValidator:\n def __init__(self, max_size: int):\n self.body_len = 0\n self.max_size = max_size\n\n def __call__(self, chunk: bytes):\n self.body_len += len(chunk)\n if self.body_len > self.max_size:\n raise MaxBodySizeException(body_len=self.body_len)\n \n@app.post(\'/upload\')\nasync def upload(request: Request):\n body_validator = MaxBodySizeValidator(MAX_REQUEST_BODY_SIZE)\n filename = request.headers.get(\'Filename\')\n \n if not filename:\n raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, \n detail=\'Filename header is missing\')\n try:\n filepath = os.path.join(\'./\', os.path.basename(filename)) \n file_ = FileTarget(filepath, validator=MaxSizeValidator(MAX_FILE_SIZE))\n data = ValueTarget()\n parser = StreamingFormDataParser(headers=request.headers)\n parser.register(\'file\', file_)\n parser.register(\'data\', data)\n \n async for chunk in request.stream():\n body_validator(chunk)\n parser.data_received(chunk)\n except ClientDisconnect:\n print("Client Disconnected")\n except MaxBodySizeException as e:\n raise HTTPException(status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, \n detail=f\'Maximum request body size limit ({MAX_REQUEST_BODY_SIZE} bytes) exceeded ({e.body_len} bytes read)\')\n except streaming_form_data.validators.ValidationError:\n raise HTTPException(status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, \n detail=f\'Maximum file size limit ({MAX_FILE_SIZE} bytes) exceeded\') \n except Exception:\n raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, \n detail=\'There was an error uploading the file\') \n \n if not file_.multipart_filename:\n raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=\'File is missing\')\n\n print(data.value.decode())\n print(file_.multipart_filename)\n \n return {"message": f"Successfuly uploaded {filename}"}\n
Run Code Online (Sandbox Code Playgroud)\n如前所述,要上传数据(在客户端),您可以使用该HTTPX
库,该库默认支持流式文件上传,因此允许您发送大型流/文件,而无需将它们完全加载到内存中。您Form
还可以使用data
参数传递其他数据。下面,使用自定义标头,即Filename
,将文件名传递给服务器,以便服务器FileTarget
用该名称实例化类(如果您愿意,可以使用X-
自定义标头的前缀;但是,官方不建议这样做不再)。
要上传多个文件,请为每个文件使用标头(或者,在服务器端使用随机名称,文件完全上传后,您可以选择使用属性重命名它file_.multipart_filename
),传递文件列表,如文档(注意:为每个文件使用不同的字段名称,以便在服务器端解析它们时它们不会重叠,例如,files = [(\'file\', open(\'bigFile.zip\', \'rb\')),(\'file_2\', open(\'bigFile2.zip\', \'rb\'))]
最后,Target
相应地在服务器端定义类。
测试.py
\nimport httpx\nimport time\n\nurl =\'http://127.0.0.1:8000/upload\'\nfiles = {\'file\': open(\'bigFile.zip\', \'rb\')}\nheaders={\'Filename\': \'bigFile.zip\'}\ndata = {\'data\': \'Hello World!\'}\n\nwith httpx.Client() as client:\n start = time.time()\n r = client.post(url, data=data, files=files, headers=headers)\n end = time.time()\n print(f\'Time elapsed: {end - start}s\')\n print(r.status_code, r.json(), sep=\' \')\n
Run Code Online (Sandbox Code Playgroud)\nFile
和JSON
正文如果您想上传文件和 JSON 而不是数据,您可以使用此答案Form
的方法 3 中描述的方法,从而也使您无需对接收到的字段执行手动检查,如前所述(请参阅链接答案以获取更多详细信息)。为此,请对上面的代码进行以下更改。Form
应用程序.py
\n#...\nfrom fastapi import Form\nfrom pydantic import BaseModel, ValidationError\nfrom typing import Optional\nfrom fastapi.encoders import jsonable_encoder\n\n#...\n\nclass Base(BaseModel):\n name: str\n point: Optional[float] = None\n is_accepted: Optional[bool] = False\n \ndef checker(data: str = Form(...)):\n try:\n return Base.parse_raw(data)\n except ValidationError as e:\n raise HTTPException(detail=jsonable_encoder(e.errors()), status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)\n \n\n@app.post(\'/upload\')\nasync def upload(request: Request):\n #...\n \n # place the below after the try-except block in the example given earlier\n model = checker(data.value.decode())\n print(dict(model))\n
Run Code Online (Sandbox Code Playgroud)\n测试.py
\n#...\nimport json\n\ndata = {\'data\': json.dumps({"name": "foo", "point": 0.13, "is_accepted": False})}\n#...\n
Run Code Online (Sandbox Code Playgroud)\nFile
和Form
数据UploadFile
Form
如果您想使用普通def
端点,请参阅此答案。
应用程序.py
\nfrom fastapi import FastAPI, File, UploadFile, Form, HTTPException, status\nimport aiofiles\nimport os\n\nCHUNK_SIZE = 1024 * 1024 # adjust the chunk size as desired\napp = FastAPI()\n\n@app.post("/upload")\nasync def upload(file: UploadFile = File(...), data: str = Form(...)):\n try:\n filepath = os.path.join(\'./\', os.path.basename(file.filename))\n async with aiofiles.open(filepath, \'wb\') as f:\n while chunk := await file.read(CHUNK_SIZE):\n await f.write(chunk)\n except Exception:\n raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, \n detail=\'There was an error uploading the file\')\n finally:\n await file.close()\n\n return {"message": f"Successfuly uploaded {file.filename}"}\n
Run Code Online (Sandbox Code Playgroud)\n如前所述,使用此选项将需要更长的时间才能完成文件上传,并且由于HTTPX
使用 5 秒的默认超时,您很可能会遇到异常ReadTimeout
(因为服务器需要一些时间来读取SpooledTemporaryFile
块并写入内容到磁盘上的永久位置)。因此,您可以配置超时(Timeout
也请参阅源代码中的类),更具体地说,read
超时“指定等待接收一大块数据(例如,响应的一大块)的最大持续时间身体)”。如果设置None
为而不是某个正数值,则 不会超时read
。
测试.py
\nimport httpx\nimport time\n\nurl =\'http://127.0.0.1:8000/upload\'\nfiles = {\'file\': open(\'bigFile.zip\', \'rb\')}\nheaders={\'Filename\': \'bigFile.zip\'}\ndata = {\'data\': \'Hello World!\'}\ntimeout = httpx.Timeout(None, read=180.0)\n\nwith httpx.Client(timeout=timeout) as client:\n start = time.time()\n r = client.post(url, data=data, files=files, headers=headers)\n end = time.time()\n print(f\'Time elapsed: {end - start}s\')\n print(r.status_code, r.json(), sep=\' \')\n
Run Code Online (Sandbox Code Playgroud)\n
归档时间: |
|
查看次数: |
19920 次 |
最近记录: |