Gre*_*een 17 python gunicorn python-multiprocessing pytorch
我正在使用 torch、gunicorn 和 Flask 创建一个应该使用 CUDA 的推理服务。为了减少资源需求,我使用了gunicorn的预加载选项,因此模型在工作进程之间共享。然而,这会导致 CUDA 出现问题。下面的代码片段显示了一个最小的重现示例:
from flask import Flask, request
import torch
app = Flask('dummy')
model = torch.rand(500)
model = model.to('cuda:0')
@app.route('/', methods=['POST'])
def f():
data = request.get_json()
x = torch.rand((data['number'], 500))
x = x.to('cuda:0')
res = x * model
return {
"result": res.sum().item()
}
Run Code Online (Sandbox Code Playgroud)
使用 启动服务器CUDA_VISIBLE_DEVICES=1 gunicorn -w 3 -b $HOST_IP:8080 --preload run_server:app可以让服务成功启动。然而,一旦执行第一个请求(curl -X POST -d '{"number": 1}'),工作人员就会抛出以下错误:
[2022-06-28 09:42:00,378] ERROR in app: Exception on / [POST]
Traceback (most recent call last):
File "/home/user/.local/lib/python3.6/site-packages/flask/app.py", line 2447, in wsgi_app
response = self.full_dispatch_request()
File "/home/user/.local/lib/python3.6/site-packages/flask/app.py", line 1952, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/home/user/.local/lib/python3.6/site-packages/flask/app.py", line 1821, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/home/user/.local/lib/python3.6/site-packages/flask/_compat.py", line 39, in reraise
raise value
File "/home/user/.local/lib/python3.6/site-packages/flask/app.py", line 1950, in full_dispatch_request
rv = self.dispatch_request()
File "/home/user/.local/lib/python3.6/site-packages/flask/app.py", line 1936, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/home/user/project/run_server.py", line 14, in f
x = x.to('cuda:0')
File "/home/user/.local/lib/python3.6/site-packages/torch/cuda/__init__.py", line 195, in _lazy_init
"Cannot re-initialize CUDA in forked subprocess. " + msg)
RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method
Run Code Online (Sandbox Code Playgroud)
我在父进程中加载模型,并且每个分叉的工作进程都可以访问它。在工作进程中创建 CUDA 支持的张量时会出现此问题。这会在工作进程中重新初始化 CUDA 上下文,但会失败,因为它已在父进程中初始化。如果我们设置x = data['number']并删除x = x.to('cuda:0'),则推理成功。
添加torch.multiprocessing.set_start_method('spawn')或multiprocessing.set_start_method('spawn')不会改变任何东西,可能是因为gunicornfork在启动该--preload选项时肯定会使用。
解决方案可能是不使用该--preload选项,这会导致内存/GPU 中出现模型的多个副本。但这正是我试图避免的。
是否有可能解决这个问题,而无需在每个工作进程中单独加载模型?
正如 @Newbie 的评论中正确指出的那样,问题不是模型本身,而是 CUDA 上下文。当新的子进程被分叉时,父进程的内存以只读方式与子进程共享,但 CUDA 上下文不支持这种共享,必须将其复制给子进程。因此,它报告上述错误。
Spawn代替Fork要解决此问题,我们必须将子进程的启动方法从 更改fork为spawnwith multiprocessing.set_start_method。下面的简单示例可以正常工作:
import torch
import torch.multiprocessing as mp
def f(y):
y[0] = 1000
if __name__ == '__main__':
x = torch.zeros(1).cuda()
x.share_memory_()
mp.set_start_method('spawn')
p = mp.Process(target=f, args=(x,), daemon=True)
p.start()
p.join()
print("x =", x.item())
Run Code Online (Sandbox Code Playgroud)
运行此代码时,会初始化第二个 CUDA 上下文(可以通过watch -n 1 nvidia-smi第二个窗口观察到),并f在上下文完全初始化后执行。之后,x = 1000.0打印在控制台上,因此,我们确认张量x已在进程之间成功共享。
但是,Gunicorn 在内部用于os.fork启动工作进程,因此multiprocessing.set_start_method对 Gunicorn 的行为没有影响。因此,必须避免在根进程中初始化 CUDA 上下文。
为了在工作进程之间共享模型,我们必须在一个进程中加载模型并与工作进程共享。幸运的是,通过 a 将 CUDA 张量发送torch.multiprocessing.Queue到另一个进程不会复制 GPU 上的参数,因此我们可以使用这些队列来解决这个问题。
import time
import torch
import torch.multiprocessing as mp
def f(q):
y = q.get()
y[0] = 1000
def g(q):
x = torch.zeros(1).cuda()
x.share_memory_()
q.put(x)
q.put(x)
while True:
time.sleep(1) # this process must live as long as x is in use
if __name__ == '__main__':
queue = mp.Queue()
pf = mp.Process(target=f, args=(queue,), daemon=True)
pf.start()
pg = mp.Process(target=g, args=(queue,), daemon=True)
pg.start()
pf.join()
x = queue.get()
print("x =", x.item()) # Prints x = 1000.0
Run Code Online (Sandbox Code Playgroud)
对于 Gunicorn 服务器,我们可以使用相同的策略:模型服务器进程加载模型并在其分叉后将其提供给每个新的工作进程。在挂钩中,post_fork工作人员从模型服务器请求并接收模型。Gunicorn 配置可能如下所示:
import logging
from client import request_model
from app import app
logging.basicConfig(level=logging.INFO)
bind = "localhost:8080"
workers = 1
zmq_url = "tcp://127.0.0.1:5555"
def post_fork(server, worker):
app.config['MODEL'], app.config['COUNTER'] = request_model(zmq_url)
Run Code Online (Sandbox Code Playgroud)
在post_fork钩子中,我们调用request_model从模型服务器获取模型并将模型存储在 Flask 应用程序的配置中。该方法request_model在我的示例文件中定义client.py,定义如下:
import logging
import os
from torch.multiprocessing.reductions import ForkingPickler
import zmq
def request_model(zmq_url: str):
logging.info("Connecting")
context = zmq.Context()
with context.socket(zmq.REQ) as socket:
socket.connect(zmq_url)
logging.info("Sending request")
socket.send(ForkingPickler.dumps(os.getpid()))
logging.info("Waiting for a response")
model = ForkingPickler.loads(socket.recv())
logging.info("Got response from object server")
return model
Run Code Online (Sandbox Code Playgroud)
我们在这里使用ZeroMQ进行进程间通信,因为它允许我们通过名称/地址引用服务器并将服务器代码外包到其自己的应用程序中。multiprocessing.Queue显然与multiprocessing.ProcessGunicorn配合得不好。multiprocessing.Queue在内部使用ForkingPickler来序列化对象,并且该模块torch.multiprocessing以可以适当且可靠地序列化 Torch 数据结构的方式更改它。因此,我们使用此类来序列化我们的模型,以将其发送到工作进程。
该模型在与 Gunicorn 完全独立的应用程序中加载并提供服务,并在中定义server.py:
from argparse import ArgumentParser
import logging
import torch
from torch.multiprocessing.reductions import ForkingPickler
import zmq
def load_model():
model = torch.nn.Linear(10000, 50000)
model.cuda()
model.share_memory()
counter = torch.zeros(1).cuda()
counter.share_memory_()
return model, counter
def share_object(obj, url):
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind(url)
while True:
logging.info("Waiting for requests on %s", url)
message = socket.recv()
logging.info("Got a message from %d", ForkingPickler.loads(message))
socket.send(ForkingPickler.dumps(obj))
if __name__ == '__main__':
parser = ArgumentParser(description="Serve model")
parser.add_argument("--listen-address", default="tcp://127.0.0.1:5555")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
logging.info("Loading model")
model = load_model()
share_object(model, args.listen_address)
Run Code Online (Sandbox Code Playgroud)
对于此测试,我们使用大小约为 2GB 的模型来查看对 GPU 内存分配的影响,nvidia-smi并使用一个小张量来验证数据是否确实在进程之间共享。
我们的示例 Flask 应用程序使用随机输入运行模型,计算请求数量并返回两个结果:
from flask import Flask
import torch
app = Flask(__name__)
@app.route("/", methods=["POST"])
def infer():
model: torch.nn.Linear = app.config['MODEL']
counter: torch.Tensor = app.config['COUNTER']
counter[0] += 1 # not thread-safe
input_features = torch.rand(model.in_features).cuda()
return {
"result": model(input_features).sum().item(),
"counter": counter.item()
}
Run Code Online (Sandbox Code Playgroud)
该示例可以按如下方式运行:
$ python server.py &
INFO:root:Waiting for requests on tcp://127.0.0.1:5555
$ gunicorn -c config.py app:app
[2023-02-01 16:45:34 +0800] [24113] [INFO] Starting gunicorn 20.1.0
[2023-02-01 16:45:34 +0800] [24113] [INFO] Listening at: http://127.0.0.1:8080 (24113)
[2023-02-01 16:45:34 +0800] [24113] [INFO] Using worker: sync
[2023-02-01 16:45:34 +0800] [24186] [INFO] Booting worker with pid: 24186
INFO:root:Connecting
INFO:root:Sending request
INFO:root:Waiting for a response
INFO:root:Got response from object server
Run Code Online (Sandbox Code Playgroud)
使用nvidia-smi,我们可以观察到现在有两个进程正在使用 GPU,其中一个进程比另一个进程多分配 2GB 的 VRAM。查询 Flask 应用程序也按预期工作:
$ curl -X POST localhost:8080
{"counter":1.0,"result":-23.956459045410156}
$ curl -X POST localhost:8080
{"counter":2.0,"result":-8.161510467529297}
$ curl -X POST localhost:8080
{"counter":3.0,"result":-37.823692321777344}
Run Code Online (Sandbox Code Playgroud)
让我们引入一些混乱并终止我们唯一的 Gunicorn 工人:
$ kill 24186
[2023-02-01 18:02:09 +0800] [24186] [INFO] Worker exiting (pid: 24186)
[2023-02-01 18:02:09 +0800] [4196] [INFO] Booting worker with pid: 4196
INFO:root:Connecting
INFO:root:Sending request
INFO:root:Waiting for a response
INFO:root:Got response from object server
Run Code Online (Sandbox Code Playgroud)
它正在正确重新启动并准备好响应我们的请求。
最初,我们的服务所需的 VRAM 量为(SizeOf(Model) + SizeOf(CUDA context)) * Num(Workers)。通过共享模型的权重,我们可以将其减少SizeOf(Model) * (Num(Workers) - 1)到SizeOf(Model) + SizeOf(CUDA context) * Num(Workers)。
这种方法的可靠性依赖于单一模型服务器进程。如果该进程终止,不仅新启动的工作程序会陷入困境,而且现有工作程序中的模型将变得不可用,并且所有工作程序都会立即崩溃。共享张量/模型仅在服务器进程运行时可用。即使模型服务器和Gunicorn工作人员重新启动,短暂的中断肯定是不可避免的。因此,在生产环境中,您应该确保该服务器进程保持活动状态。
此外,在不同进程之间共享数据可能会产生副作用。共享可变数据时,必须使用适当的锁来避免竞争条件。
| 归档时间: |
|
| 查看次数: |
25597 次 |
| 最近记录: |