Mat*_*der 3 file-upload fastapi httpx
我在 VirtualBox 上配置并配置了一台具有 2048 MB RAM 的 Fedora 34 虚拟机,以便在localhost:7070. 完整的应用程序源代码和依赖代码以及说明位于此处。以下是我可以制作的最小的可重现示例。
main.py
import os, pathlib
import fastapi as fast
import aiofiles
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
RESULTS_DIR = pathlib.Path('/'.join((ROOT_DIR, 'results')))
app = fast.FastAPI()
@app.post('/api')
async def upload(
request: fast.Request,
file: fast.UploadFile = fast.File(...),
filedir: str = ''):
dest = RESULTS_DIR.joinpath(filedir, file.filename)
dest.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(dest, 'wb') as buffer:
await file.seek(0)
contents = await file.read()
await buffer.write(contents)
return f'localhost:7070/{dest.parent.name}/{dest.name}'
Run Code Online (Sandbox Code Playgroud)
start.sh服务器应用程序
#! /bin/bash
uvicorn --host "0.0.0.0" --log-level debug --port 7070 main:app
Run Code Online (Sandbox Code Playgroud)
client.py
import httpx
from pathlib import Path
import asyncio
async def async_post_file_req(url: str, filepath: Path):
async with httpx.AsyncClient(
timeout=httpx.Timeout(write=None, read=None, connect=None, pool=None)) as client:
r = await client.post(
url,
files={
'file': (filepath.name, filepath.open('rb'), 'application/octet-stream')
}
)
if __name__ == '__main__':
url = 'http://localhost:7070'
asyncio.run(
async_post_file_req(
f'{url}/api',
Path('~/1500M.txt')
))
Run Code Online (Sandbox Code Playgroud)
创建一个 1500 MB 的文件
truncate -s 1500M 1500M.txt
Run Code Online (Sandbox Code Playgroud)
当上传1500MB的文件时,当前的实现upload似乎是将整个文件读入内存,然后服务器响应{status: 400, reason: 'Bad Request', details: 'There was an error parsing the body.'},并且文件不会写入磁盘。上传 825 MB 文件时,服务器响应 200,并将文件写入磁盘。我不明白为什么解析较大的文件时会出现错误。
这是怎么回事?
如何上传大于机器可用内存的文件?
我必须流式传输身体吗?
深入研究源代码,我发现 FastAPI在尝试确定是否需要读取请求表单或正文时,在源代码中抛出了状态代码 400 和There was an error in parsing body 详细信息的 HTTP 异常。FastAPI请求基本上是 Starlette 请求,因此我将 FastAPI 服务器应用程序重新实现为 Starlette 应用程序,希望它能够绕过此异常处理程序并为我提供有关此问题的更多信息。
main.py
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route
async def homepage(request):
return JSONResponse({'hello': 'world'})
async def upload(request):
form = await request.form()
print(type(form['upload_file']))
filename = form['upload_file'].filename or 'not found'
contents = await form['upload_file'].read()
b = len(contents) or -1
return JSONResponse({
'filename': filename,
'bytes': b
})
app = Starlette(debug=True, routes=[
Route('/', homepage),
Route('/api', upload, methods=['POST'])
])
Run Code Online (Sandbox Code Playgroud)
Pipfile
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
starlette = "*"
uvicorn = "*"
uvloop = "*"
httpx = "*"
watchgod = "*"
python-multipart = "*"
[dev-packages]
[requires]
python_version = "3.9"
Run Code Online (Sandbox Code Playgroud)
在发布大小为 989 MiB 或更大的文件时,Starlette 应用程序会抛出操作系统错误 28,设备上没有剩余空间。大小为 988 MiB 或更小的文件不会导致错误。
INFO: 10.0.2.2:46996 - "POST /api HTTP/1.1" 500 Internal Server Error
ERROR: Exception in ASGI application
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/uvicorn/protocols/http/httptools_impl.py", line 398, in run_asgi
result = await app(self.scope, self.receive, self.send)
File "/usr/local/lib/python3.9/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
return await self.app(scope, receive, send)
File "/usr/local/lib/python3.9/site-packages/starlette/applications.py", line 112, in __call__
await self.middleware_stack(scope, receive, send)
File "/usr/local/lib/python3.9/site-packages/starlette/middleware/errors.py", line 181, in __call__
raise exc from None
File "/usr/local/lib/python3.9/site-packages/starlette/middleware/errors.py", line 159, in __call__
await self.app(scope, receive, _send)
File "/usr/local/lib/python3.9/site-packages/starlette/exceptions.py", line 82, in __call__
raise exc from None
File "/usr/local/lib/python3.9/site-packages/starlette/exceptions.py", line 71, in __call__
await self.app(scope, receive, sender)
File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 580, in __call__
await route.handle(scope, receive, send)
File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 241, in handle
await self.app(scope, receive, send)
File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 52, in app
response = await func(request)
File "/home/vagrant/star-file-server/./main.py", line 11, in upload
form = await request.form()
File "/usr/local/lib/python3.9/site-packages/starlette/requests.py", line 240, in form
self._form = await multipart_parser.parse()
File "/usr/local/lib/python3.9/site-packages/starlette/formparsers.py", line 231, in parse
await file.write(message_bytes)
File "/usr/local/lib/python3.9/site-packages/starlette/datastructures.py", line 445, in write
await run_in_threadpool(self.file.write, data)
File "/usr/local/lib/python3.9/site-packages/starlette/concurrency.py", line 40, in run_in_threadpool
return await loop.run_in_executor(None, func, *args)
File "/usr/lib64/python3.9/concurrent/futures/thread.py", line 52, in run
result = self.fn(*self.args, **self.kwargs)
File "/usr/lib64/python3.9/tempfile.py", line 755, in write
rv = file.write(s)
OSError: [Errno 28] No space left on device
Run Code Online (Sandbox Code Playgroud)
Starlette 的 UploadFile 数据结构使用SpooledTemporaryFile。该对象写入操作系统的临时目录。我的临时目录是/tmp因为我使用的是 Fedora 34,并且我没有创建任何环境变量来告诉 python 使用其他任何内容作为临时目录。
[vagrant@fedora star-file-server]$ python
Python 3.9.5 (default, May 14 2021, 00:00:00)
[GCC 11.1.1 20210428 (Red Hat 11.1.1-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import tempfile
>>> tempfile.gettempdir()
'/tmp'
Run Code Online (Sandbox Code Playgroud)
[vagrant@fedora star-file-server]$ df -h
Filesystem Size Used Avail Use% Mounted on
devtmpfs 974M 0 974M 0% /dev
tmpfs 989M 168K 989M 1% /dev/shm
tmpfs 396M 5.6M 390M 2% /run
/dev/sda1 40G 1.6G 36G 5% /
tmpfs 989M 0 989M 0% /tmp
tmpfs 198M 84K 198M 1% /run/user/1000
Run Code Online (Sandbox Code Playgroud)
Starlette 设置max_size为SpooledTemporaryDirectory1 MiB。从 Python 临时文件文档来看,我认为这意味着在使用临时文件时一次只能将 1 MiB 读入内存。尽管只有 1 MiB,但 989 MiB 似乎是正确的大小硬边界,UploadFile因为SpooledTemporaryDirectory它受到系统临时目录可用存储的限制。
如果我仍然想使用,UploadFile我可以创建一个环境变量来指向已知始终有足够可用空间的设备,即使对于最大的上传也是如此。
export TMPDIR=/huge_storage_device
Run Code Online (Sandbox Code Playgroud)
我更喜欢的方法使用请求的stream, 以避免必须将文件写入两次,第一次写入本地临时目录,第二次写入本地永久目录。
import os, pathlib
import fastapi as fast
import aiofiles
app = fast.FastAPI()
@app.post('/stream')
async def stream(
request: fast.Request,
filename: str,
filedir: str = ''
):
dest = RESULTS_DIR.joinpath(filedir, filename)
dest.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(dest, 'wb') as buffer:
async for chunk in request.stream():
await buffer.write(chunk)
return {
'loc': f'localhost:7070/{dest.parent.name}/{dest.name}'
}
Run Code Online (Sandbox Code Playgroud)
使用这种方法,当我将文件(5M、450M、988M,每个都有两次重复测量)上传到运行在具有 2048 MiB 内存的 Fedora 虚拟机上的服务器时,服务器从未使用太多内存,也从未崩溃,并且平均延迟减少为 40%(即,发布到 的延迟/stream约为发布到 的延迟的 60% /api)。
| 归档时间: |
|
| 查看次数: |
7424 次 |
| 最近记录: |