Cha*_*ker 14 machine-learning multiprocessing deep-learning conv-neural-network pytorch
我想并行处理单个示例或一批示例(在我的情况下,我只有 cpus,我有多达 112 个)。我试过了,但我得到一个错误,即损失不能从单独的进程中产生梯度(这完全破坏了我的尝试)。我仍然想这样做,并且在多重处理发生后我可以做一个优化器步骤是必不可少的。我该如何解决?我做了一个完全独立的例子:
import torch
import torch.nn as nn
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import Dataset, DataLoader
from torch.multiprocessing import Pool
class SimpleDataSet(Dataset):
def __init__(self, Din, num_examples=23):
self.x_dataset = [torch.randn(Din) for _ in range(num_examples)]
# target function is x*x
self.y_dataset = [x**2 for x in self.x_dataset]
def __len__(self):
return len(self.x_dataset)
def __getitem__(self, idx):
return self.x_dataset[idx], self.y_dataset[idx]
def get_loss(args):
x, y, model = args
y_pred = model(x)
criterion = nn.MSELoss()
loss = criterion(y_pred, y)
return loss
def get_dataloader(D, num_workers, batch_size):
ds = SimpleDataSet(D)
dl = DataLoader(ds, batch_size=batch_size, num_workers=num_workers)
return dl
def train_fake_data():
num_workers = 2
Din, Dout = 3, 1
model = nn.Linear(Din, Dout).share_memory()
optimizer = torch.optim.Adam(model.parameters(), lr=0.1)
batch_size = 2
num_epochs = 10
# num_batches = 5
num_procs = 5
dataloader = get_dataloader(Din, num_workers, batch_size)
scheduler = StepLR(optimizer, step_size=1, gamma=0.7)
for epoch in range(num_epochs):
for _, batch in enumerate(dataloader):
batch = [(torch.randn(Din), torch.randn(Dout), model) for _ in batch]
with Pool(num_procs) as pool:
optimizer.zero_grad()
losses = pool.map(get_loss, batch)
loss = torch.mean(losses)
loss.backward()
optimizer.step()
# scheduler
scheduler.step()
if __name__ == '__main__':
# start = time.time()
# train()
train_fake_data()
# print(f'execution time: {time.time() - start}')
Run Code Online (Sandbox Code Playgroud)
错误:
Traceback (most recent call last):
File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3427, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-2-ea57e03ba088>", line 1, in <module>
runfile('/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py', wdir='/Users/brando/ML4Coq/playground/multiprocessing_playground')
File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
pydev_imports.execfile(filename, global_vars, local_vars) # execute the script
File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py", line 95, in <module>
train_fake_data()
File "/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py", line 83, in train_fake_data
losses = pool.map(get_loss, batch)
File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/multiprocessing/pool.py", line 290, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/multiprocessing/pool.py", line 683, in get
raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '[tensor(0.5237, grad_fn=<MseLossBackward>)]'. Reason: 'RuntimeError('Cowardly refusing to serialize non-leaf tensor which requires_grad, since autograd does not support crossing process boundaries. If you just want to transfer the data, call detach() on the tensor before serializing (e.g., putting it on the queue).')'
Run Code Online (Sandbox Code Playgroud)
我确定我想这样做。我该怎么做?
"""
Based on: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
Note: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use
different communication backends and are not restricted to being executed on the same machine.
"""
import torch
from torch import nn, optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
import os
num_epochs = 5
batch_size = 8
Din, Dout = 10, 5
data_x = torch.randn(batch_size, Din)
data_y = torch.randn(batch_size, Dout)
data = [(i*data_x, i*data_y) for i in range(num_epochs)]
class OneDeviceModel(nn.Module):
"""
Toy example for a model ran in parallel but not distributed accross gpus
(only processes with their own gpu or hardware)
"""
def __init__(self):
super().__init__()
self.net1 = nn.Linear(Din, Din)
self.relu = nn.ReLU()
self.net2 = nn.Linear(Din, Dout)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
def setup_process(rank, world_size, backend='gloo'):
"""
Initialize the distributed environment (for each process).
gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that
it's a library/API for process to communicate/coordinate with each other/master. It's a backend library.
"""
# set up the master's ip address so this child process can coordinate
# os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends
if torch.cuda.is_available():
backend = 'nccl'
# Initializes the default distributed process group, and this will also initialize the distributed package.
dist.init_process_group(backend, rank=rank, world_size=world_size)
def cleanup():
""" Destroy a given process group, and deinitialize the distributed package """
dist.destroy_process_group()
def run_parallel_training_loop(rank, world_size):
"""
Distributed function to be implemented later.
This is the function that is actually ran in each distributed process.
Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,
you don’t need to worry about different DDP processes start from different model parameter initial values.
"""
print()
print(f"Start running DDP with model parallel example on rank: {rank}.")
print(f'current process: {mp.current_process()}')
print(f'pid: {os.getpid()}')
setup_process(rank, world_size)
# create model and move it to GPU with id rank
model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()
# ddp_model = DDP(model, device_ids=[rank])
ddp_model = DDP(model)
for batch_idx, batch in enumerate(data):
x, y = batch
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(x)
labels = y.to(rank) if torch.cuda.is_available() else y
# Gradient synchronization communications take place during the backward pass and overlap with the backward computation.
loss_fn(outputs, labels).backward() # When the backward() returns, param.grad already contains the synchronized gradient tensor.
optimizer.step() # TODO how does the optimizer know to do the gradient step only once?
print()
print(f"Start running DDP with model parallel example on rank: {rank}.")
print(f'current process: {mp.current_process()}')
print(f'pid: {os.getpid()}')
# Destroy a given process group, and deinitialize the distributed package
cleanup()
def main():
print()
print('running main()')
print(f'current process: {mp.current_process()}')
print(f'pid: {os.getpid()}')
# args
world_size = mp.cpu_count()
mp.spawn(run_parallel_training_loop, args=(world_size,), nprocs=world_size)
if __name__ == "__main__":
print('starting __main__')
main()
print('Done!\a\n')
Run Code Online (Sandbox Code Playgroud)
它似乎有效,但我的问题在第 74 行,我需要这样做吗
model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()
Run Code Online (Sandbox Code Playgroud)
或者
model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel()
Run Code Online (Sandbox Code Playgroud)
让它在多个 CPU 中正常工作?
我当前的问题是当只有 cpu 可用时,cpu 并行作业比串行运行慢。
相关研究链接:
Torch 将使用多个 CPU 来并行化操作,因此您的串行可能正在使用多核矢量化。
\n举这个简单的例子
\nimport torch\nc = 0;\nfor i in range(10000):\n A = torch.randn(1000, 1000, device=\'cpu\');\n B = torch.randn(1000, 1000, device=\'cpu\');\n c += torch.sum(A @ B)\nRun Code Online (Sandbox Code Playgroud)\n没有使用代码来并行化,但是 12 个 CPU 的 80% 采用默认配置。
\n\n您可以使用torch.set_num_threads设置 CPU 上的操作内并行度。特别是如果您正在运行多个进程并且希望每个进程使用单个 CPU,您可能需要将每个进程中的操作内并行度设置为1。
然而,并行化操作是有成本的,我无法详细介绍实现细节,但我们可以运行一个快速实验来显示使用多个线程的开销。
\nimport matplotlib.pyplot as plt\nimport numpy as np\nimport torch;\nimport time;\nA = torch.randn(1000, 1000, device=\'cpu\');\nB = torch.randn(1000, 1000, device=\'cpu\');\nfuncs = {\n \'sin\': lambda a,b: torch.sin(A),\n \'tanh\': lambda a,b: torch.tanh(A),\n \'log\': lambda a,b: torch.log(A),\n \'matmul\': lambda a,b: A @ B.T\n}\nt = np.zeros(20)\nfor k,f in funcs.items():\n for i in range(1, len(t) + 1):\n torch.set_num_threads(i)\n c = 0;\n t0 = time.time();\n for _ in range(100):\n f(A,B)\n tf = time.time()\n t[i-1] = (tf - t0)*i;\n plt.plot(np.arange(1, len(t)+1), t, \'-o\', label=k)\nplt.xlabel(\'Number of threads\')\nplt.legend()\nplt.ylabel(\'Core x time\')\nRun Code Online (Sandbox Code Playgroud)\n\n但如果我们将总 CPU 时间乘以线程数,我们会发现单线程版本效率更高。
\n\n如果您能够通过运行独立进程在更高级别上并行化您的实验,您应该尝试为每个进程使用单个核心,否则每个进程将尝试使用所有 CPU,并且所有进程都将运行非常慢,因为您的系统超载。
\n我故意修改了示例脚本的超参数,以支持多进程。
\n"""\nBased on: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html\n\nNote: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use\ndifferent communication backends and are not restricted to being executed on the same machine.\n"""\nimport torch\nfrom torch import nn, optim\nimport torch.distributed as dist\nimport torch.multiprocessing as mp\nfrom torch.nn.parallel import DistributedDataParallel as DDP\nimport argparse\nimport os\n\n# More than one epoch so that the initialization is less significant\n# than compared to the model processing time\nnum_epochs = 10\n# for the experiment select a number that has a lot of divisors\n# as I want to test with equal number of batches\nnum_batches = 16*9*5\n# Uses a larger batch so that more work is done in each process\n# between two gradient synchronizations\n# apparently the intraop optimization is not helping \n# (at least not too much) in the batch dimension\nbatch_size = 10000\n# Use smaller dimensions, so that the intraop parallelization becomes less \n# helpful\nDin, Dout = 3, 5\ndata_x = torch.randn(batch_size, Din)\ndata_y = torch.randn(batch_size, Dout)\ndata = [(i*data_x, i*data_y) for i in range(num_batches)]\n\nclass OneDeviceModel(nn.Module):\n """\n Toy example for a model ran in parallel but not distributed accross gpus\n (only processes with their own gpu or hardware)\n """\n def __init__(self):\n super().__init__()\n # -- Use more layers\n self.net = [nn.Linear(Din, Din) for _ in range(10)]\n # -- Bob: use more complex activation \n self.tanh = nn.Tanh()\n self.sigmoid = nn.Sigmoid()\n self.relu = nn.ReLU()\n self.net2 = nn.Linear(Din, Dout)\n\n def forward(self, x):\n # apply the 10 layers sequentially\n for i in range(10):\n x = self.net[i](x)\n x = self.sigmoid(x)\n x = self.tanh(x)\n x = self.relu(x)\n return self.net2(x)\n\ndef setup_process(rank, world_size, backend=\'gloo\'):\n """\n Initialize the distributed environment (for each process).\n\n gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that\n it\'s a library/API for process to communicate/coordinate with each other/master. It\'s a backend library.\n """\n # set up the master\'s ip address so this child process can coordinate\n # os.environ[\'MASTER_ADDR\'] = \'127.0.0.1\'\n os.environ[\'MASTER_ADDR\'] = \'localhost\'\n os.environ[\'MASTER_PORT\'] = \'12355\'\n\n # - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends\n if torch.cuda.is_available():\n backend = \'nccl\'\n # Initializes the default distributed process group, and this will also initialize the distributed package.\n dist.init_process_group(backend, rank=rank, world_size=world_size)\n\ndef cleanup():\n """ Destroy a given process group, and deinitialize the distributed package """\n dist.destroy_process_group()\n\ndef run_parallel_training_loop(rank, world_size):\n """\n Distributed function to be implemented later.\n\n This is the function that is actually ran in each distributed process.\n\n Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,\n you don\xe2\x80\x99t need to worry about different DDP processes start from different model parameter initial values.\n """\n print()\n print(f"Start running DDP with model parallel example on rank: {rank}.")\n print(f\'current process: {mp.current_process()}\')\n print(f\'pid: {os.getpid()}\')\n setup_process(rank, world_size)\n torch.set_num_threads(mp.cpu_count() // world_size)\n # create model and move it to GPU with id rank\n model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()\n # ddp_model = DDP(model, device_ids=[rank])\n ddp_model = DDP(model)\n for _ in range(num_epochs):\n for batch_idx, batch in enumerate(data[rank::world_size]):\n x, y = batch\n loss_fn = nn.MSELoss()\n optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)\n\n optimizer.zero_grad()\n outputs = ddp_model(x)\n labels = y.to(rank) if torch.cuda.is_available() else y\n # Gradient synchronization communications take place during the backward pass and overlap with the backward computation.\n loss_fn(outputs, labels).backward() # When the backward() returns, param.grad already contains the synchronized gradient tensor.\n optimizer.step() # TODO how does the optimizer know to do the gradient step only once?\n\n print()\n print(f"Start running DDP with model parallel example on rank: {rank}.")\n print(f\'current process: {mp.current_process()}\')\n print(f\'pid: {os.getpid()}\')\n # Destroy a given process group, and deinitialize the distributed package\n cleanup()\n\ndef main():\n print()\n print(\'running main()\')\n print(f\'current process: {mp.current_process()}\')\n print(f\'pid: {os.getpid()}\')\n parser = argparse.ArgumentParser()\n parser.add_argument(\'--world-size\', default=1, type=int)\n args = parser.parse_args()\n assert num_batches % args.world_size == 0\n mp.spawn(run_parallel_training_loop, args=(args.world_size,), nprocs=args.world_size)\n\nif __name__ == "__main__":\n print(\'starting __main__\')\n main()\n print(\'Done!\\a\\n\')\nRun Code Online (Sandbox Code Playgroud)\n$ time python3 ddp.py --world-size 1 > /dev/null\n\nreal 0m59.092s\nuser 8m46.589s\nsys 0m7.320s\n\n$ time python3 ddp.py --world-size 1 > /dev/null\n\nreal 1m11.124s\nuser 10m54.209s\nsys 0m9.595s\n\n$ time python3 ddp.py --world-size 6 > /dev/null\n\nreal 0m18.348s\nuser 2m28.799s\nsys 0m18.068s\n$ time python3 ddp.py --world-size 12 > /dev/null\n\nreal 0m26.352s\nuser 4m3.074s\nsys 0m39.179s\n$ time python3 ddp.py --world-size 3 > /dev/null\n\nreal 0m23.047s\nuser 3m51.172s\nsys 0m11.483s\n$ time python3 ddp.py --world-size 4 > /dev/null\n\nreal 0m18.195s\nuser 2m55.241s\nsys 0m12.841s\n$ time python3 ddp.py --world-size 2 > /dev/null\n\nreal 0m26.955s\nuser 4m15.837s\nsys 0m7.127s\nRun Code Online (Sandbox Code Playgroud)\n如果我删除该行
\n"""\nBased on: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html\n\nNote: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use\ndifferent communication backends and are not restricted to being executed on the same machine.\n"""\nimport torch\nfrom torch import nn, optim\nimport torch.distributed as dist\nimport torch.multiprocessing as mp\nfrom torch.nn.parallel import DistributedDataParallel as DDP\nimport argparse\nimport os\n\n# More than one epoch so that the initialization is less significant\n# than compared to the model processing time\nnum_epochs = 10\n# for the experiment select a number that has a lot of divisors\n# as I want to test with equal number of batches\nnum_batches = 16*9*5\n# Uses a larger batch so that more work is done in each process\n# between two gradient synchronizations\n# apparently the intraop optimization is not helping \n# (at least not too much) in the batch dimension\nbatch_size = 10000\n# Use smaller dimensions, so that the intraop parallelization becomes less \n# helpful\nDin, Dout = 3, 5\ndata_x = torch.randn(batch_size, Din)\ndata_y = torch.randn(batch_size, Dout)\ndata = [(i*data_x, i*data_y) for i in range(num_batches)]\n\nclass OneDeviceModel(nn.Module):\n """\n Toy example for a model ran in parallel but not distributed accross gpus\n (only processes with their own gpu or hardware)\n """\n def __init__(self):\n super().__init__()\n # -- Use more layers\n self.net = [nn.Linear(Din, Din) for _ in range(10)]\n # -- Bob: use more complex activation \n self.tanh = nn.Tanh()\n self.sigmoid = nn.Sigmoid()\n self.relu = nn.ReLU()\n self.net2 = nn.Linear(Din, Dout)\n\n def forward(self, x):\n # apply the 10 layers sequentially\n for i in range(10):\n x = self.net[i](x)\n x = self.sigmoid(x)\n x = self.tanh(x)\n x = self.relu(x)\n return self.net2(x)\n\ndef setup_process(rank, world_size, backend=\'gloo\'):\n """\n Initialize the distributed environment (for each process).\n\n gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that\n it\'s a library/API for process to communicate/coordinate with each other/master. It\'s a backend library.\n """\n # set up the master\'s ip address so this child process can coordinate\n # os.environ[\'MASTER_ADDR\'] = \'127.0.0.1\'\n os.environ[\'MASTER_ADDR\'] = \'localhost\'\n os.environ[\'MASTER_PORT\'] = \'12355\'\n\n # - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends\n if torch.cuda.is_available():\n backend = \'nccl\'\n # Initializes the default distributed process group, and this will also initialize the distributed package.\n dist.init_process_group(backend, rank=rank, world_size=world_size)\n\ndef cleanup():\n """ Destroy a given process group, and deinitialize the distributed package """\n dist.destroy_process_group()\n\ndef run_parallel_training_loop(rank, world_size):\n """\n Distributed function to be implemented later.\n\n This is the function that is actually ran in each distributed process.\n\n Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,\n you don\xe2\x80\x99t need to worry about different DDP processes start from different model parameter initial values.\n """\n print()\n print(f"Start running DDP with model parallel example on rank: {rank}.")\n print(f\'current process: {mp.current_process()}\')\n print(f\'pid: {os.getpid()}\')\n setup_process(rank, world_size)\n torch.set_num_threads(mp.cpu_count() // world_size)\n # create model and move it to GPU with id rank\n model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()\n # ddp_model = DDP(model, device_ids=[rank])\n ddp_model = DDP(model)\n for _ in range(num_epochs):\n for batch_idx, batch in enumerate(data[rank::world_size]):\n x, y = batch\n loss_fn = nn.MSELoss()\n optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)\n\n optimizer.zero_grad()\n outputs = ddp_model(x)\n labels = y.to(rank) if torch.cuda.is_available() else y\n # Gradient synchronization communications take place during the backward pass and overlap with the backward computation.\n loss_fn(outputs, labels).backward() # When the backward() returns, param.grad already contains the synchronized gradient tensor.\n optimizer.step() # TODO how does the optimizer know to do the gradient step only once?\n\n print()\n print(f"Start running DDP with model parallel example on rank: {rank}.")\n print(f\'current process: {mp.current_process()}\')\n print(f\'pid: {os.getpid()}\')\n # Destroy a given process group, and deinitialize the distributed package\n cleanup()\n\ndef main():\n print()\n print(\'running main()\')\n print(f\'current process: {mp.current_process()}\')\n print(f\'pid: {os.getpid()}\')\n parser = argparse.ArgumentParser()\n parser.add_argument(\'--world-size\', default=1, type=int)\n args = parser.parse_args()\n assert num_batches % args.world_size == 0\n mp.spawn(run_parallel_training_loop, args=(args.world_size,), nprocs=args.world_size)\n\nif __name__ == "__main__":\n print(\'starting __main__\')\n main()\n print(\'Done!\\a\\n\')\nRun Code Online (Sandbox Code Playgroud)\n$ time python3 ddp.py --world-size 4 > /dev/null\n\nreal 0m40.574s\nuser 6m39.176s\nsys 0m19.025s\n\n$ time python3 ddp.py --world-size 2 > /dev/null\n\nreal 0m28.066s\nuser 3m17.775s\nsys 0m8.410s\n\n$ time python3 ddp.py --world-size 1 > /dev/null\n\nreal 0m37.114s\nuser 2m19.743s\nsys 0m4.866s\nRun Code Online (Sandbox Code Playgroud)\n使用
\n$ time python3 ddp.py --world-size 1 > /dev/null\n\nreal 0m59.092s\nuser 8m46.589s\nsys 0m7.320s\n\n$ time python3 ddp.py --world-size 1 > /dev/null\n\nreal 1m11.124s\nuser 10m54.209s\nsys 0m9.595s\n\n$ time python3 ddp.py --world-size 6 > /dev/null\n\nreal 0m18.348s\nuser 2m28.799s\nsys 0m18.068s\n$ time python3 ddp.py --world-size 12 > /dev/null\n\nreal 0m26.352s\nuser 4m3.074s\nsys 0m39.179s\n$ time python3 ddp.py --world-size 3 > /dev/null\n\nreal 0m23.047s\nuser 3m51.172s\nsys 0m11.483s\n$ time python3 ddp.py --world-size 4 > /dev/null\n\nreal 0m18.195s\nuser 2m55.241s\nsys 0m12.841s\n$ time python3 ddp.py --world-size 2 > /dev/null\n\nreal 0m26.955s\nuser 4m15.837s\nsys 0m7.127s\nRun Code Online (Sandbox Code Playgroud)\ntorch.set_num_threads(mp.cpu_count() // world_size)\nRun Code Online (Sandbox Code Playgroud)\nDDP在单个节点上似乎并不是特别有利。除非你有一个模型,它做了很多工作,特别是 pytorch 操作内并行性处理得不好,有大批量,并且最好是具有较少参数和更多操作的模型,这意味着要同步的梯度较少,例如非常大的卷积模型输入。
\nDDP 可能有用的其他场景是,如果您在模型中使用太多 Python,而不是矢量化操作。
\n| 归档时间: |
|
| 查看次数: |
565 次 |
| 最近记录: |