当 CPU 仅在 pytorch 中可用时,如何并行化一个批次的训练循环样本?

Cha*_*ker 14 machine-learning multiprocessing deep-learning conv-neural-network pytorch

我想并行处理单个示例或一批示例(在我的情况下,我只有 cpus,我有多达 112 个)。我试过了,但我得到一个错误,即损失不能从单独的进程中产生梯度(这完全破坏了我的尝试)。我仍然想这样做,并且在多重处理发生后我可以做一个优化器步骤是必不可少的。我该如何解决?我做了一个完全独立的例子:


import torch
import torch.nn as nn
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import Dataset, DataLoader

from torch.multiprocessing import Pool

class SimpleDataSet(Dataset):

    def __init__(self, Din, num_examples=23):
        self.x_dataset = [torch.randn(Din) for _ in range(num_examples)]
        # target function is x*x
        self.y_dataset = [x**2 for x in self.x_dataset]

    def __len__(self):
        return len(self.x_dataset)

    def __getitem__(self, idx):
        return self.x_dataset[idx], self.y_dataset[idx]

def get_loss(args):
    x, y, model = args
    y_pred = model(x)
    criterion = nn.MSELoss()
    loss = criterion(y_pred, y)
    return loss

def get_dataloader(D, num_workers, batch_size):
    ds = SimpleDataSet(D)
    dl = DataLoader(ds, batch_size=batch_size, num_workers=num_workers)
    return dl

def train_fake_data():
    num_workers = 2
    Din, Dout = 3, 1
    model = nn.Linear(Din, Dout).share_memory()

    optimizer = torch.optim.Adam(model.parameters(), lr=0.1)

    batch_size = 2
    num_epochs = 10
    # num_batches = 5
    num_procs = 5
    dataloader = get_dataloader(Din, num_workers, batch_size)
    scheduler = StepLR(optimizer, step_size=1, gamma=0.7)
    for epoch in range(num_epochs):
        for _, batch in enumerate(dataloader):
            batch = [(torch.randn(Din), torch.randn(Dout), model) for _ in batch]
            with Pool(num_procs) as pool:
                optimizer.zero_grad()

                losses = pool.map(get_loss, batch)
                loss = torch.mean(losses)
                loss.backward()

                optimizer.step()
            # scheduler
            scheduler.step()


if __name__ == '__main__':
    # start = time.time()
    # train()
    train_fake_data()
    # print(f'execution time: {time.time() - start}')
Run Code Online (Sandbox Code Playgroud)

错误:

Traceback (most recent call last):
  File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3427, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-2-ea57e03ba088>", line 1, in <module>
    runfile('/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py', wdir='/Users/brando/ML4Coq/playground/multiprocessing_playground')
  File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
    pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script
  File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py", line 95, in <module>
    train_fake_data()
  File "/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py", line 83, in train_fake_data
    losses = pool.map(get_loss, batch)
  File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/multiprocessing/pool.py", line 290, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/multiprocessing/pool.py", line 683, in get
    raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '[tensor(0.5237, grad_fn=<MseLossBackward>)]'. Reason: 'RuntimeError('Cowardly refusing to serialize non-leaf tensor which requires_grad, since autograd does not support crossing process boundaries.  If you just want to transfer the data, call detach() on the tensor before serializing (e.g., putting it on the queue).')'
Run Code Online (Sandbox Code Playgroud)

我确定我想这样做。我该怎么做?


使用 DDP 的新尝试

"""
Based on: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

Note: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use
different communication backends and are not restricted to being executed on the same machine.
"""
import torch
from torch import nn, optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

import os

num_epochs = 5
batch_size = 8
Din, Dout = 10, 5
data_x = torch.randn(batch_size, Din)
data_y = torch.randn(batch_size, Dout)
data = [(i*data_x, i*data_y) for i in range(num_epochs)]

class OneDeviceModel(nn.Module):
    """
    Toy example for a model ran in parallel but not distributed accross gpus
    (only processes with their own gpu or hardware)
    """
    def __init__(self):
        super().__init__()
        self.net1 = nn.Linear(Din, Din)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(Din, Dout)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))

def setup_process(rank, world_size, backend='gloo'):
    """
    Initialize the distributed environment (for each process).

    gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that
    it's a library/API for process to communicate/coordinate with each other/master. It's a backend library.
    """
    # set up the master's ip address so this child process can coordinate
    # os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends
    if torch.cuda.is_available():
        backend = 'nccl'
    # Initializes the default distributed process group, and this will also initialize the distributed package.
    dist.init_process_group(backend, rank=rank, world_size=world_size)

def cleanup():
    """ Destroy a given process group, and deinitialize the distributed package """
    dist.destroy_process_group()

def run_parallel_training_loop(rank, world_size):
    """
    Distributed function to be implemented later.

    This is the function that is actually ran in each distributed process.

    Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,
    you don’t need to worry about different DDP processes start from different model parameter initial values.
    """
    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    setup_process(rank, world_size)

    # create model and move it to GPU with id rank
    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()
    # ddp_model = DDP(model, device_ids=[rank])
    ddp_model = DDP(model)

    for batch_idx, batch in enumerate(data):
        x, y = batch
        loss_fn = nn.MSELoss()
        optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

        optimizer.zero_grad()
        outputs = ddp_model(x)
        labels = y.to(rank) if torch.cuda.is_available() else y
        # Gradient synchronization communications take place during the backward pass and overlap with the backward computation.
        loss_fn(outputs, labels).backward()  # When the backward() returns, param.grad already contains the synchronized gradient tensor.
        optimizer.step()  # TODO how does the optimizer know to do the gradient step only once?

    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    # Destroy a given process group, and deinitialize the distributed package
    cleanup()

def main():
    print()
    print('running main()')
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    # args
    world_size = mp.cpu_count()
    mp.spawn(run_parallel_training_loop, args=(world_size,), nprocs=world_size)

if __name__ == "__main__":
    print('starting __main__')
    main()
    print('Done!\a\n')
Run Code Online (Sandbox Code Playgroud)

它似乎有效,但我的问题在第 74 行,我需要这样做吗

    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()
Run Code Online (Sandbox Code Playgroud)

或者

    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel()
Run Code Online (Sandbox Code Playgroud)

让它在多个 CPU 中正常工作?


即使我有 112 个 cpu 内核,串行还是比并行快?

我当前的问题是当只有 cpu 可用时,cpu 并行作业比串行运行慢。


相关研究链接:

Bob*_*Bob 4

Torch 将使用多个 CPU 来并行化操作,因此您的串行可能正在使用多核矢量化。

\n

举这个简单的例子

\n
import torch\nc = 0;\nfor i in range(10000):\n    A = torch.randn(1000, 1000, device=\'cpu\');\n    B = torch.randn(1000, 1000, device=\'cpu\');\n    c += torch.sum(A @ B)\n
Run Code Online (Sandbox Code Playgroud)\n

没有使用代码来并行化,但是 12 个 CPU 的 80% 采用默认配置。

\n

在此输入图像描述

\n

您可以使用torch.set_num_threads设置 CPU 上的操作内并行度。特别是如果您正在运行多个进程并且希望每个进程使用单个 CPU,您可能需要将每个进程中的操作内并行度设置为1

\n

然而,并行化操作是有成本的,我无法详细介绍实现细节,但我们可以运行一个快速实验来显示使用多个线程的开销。

\n
import matplotlib.pyplot as plt\nimport numpy as np\nimport torch;\nimport time;\nA = torch.randn(1000, 1000, device=\'cpu\');\nB = torch.randn(1000, 1000, device=\'cpu\');\nfuncs = {\n    \'sin\': lambda a,b: torch.sin(A),\n    \'tanh\': lambda a,b: torch.tanh(A),\n    \'log\': lambda a,b: torch.log(A),\n    \'matmul\': lambda a,b: A @ B.T\n}\nt = np.zeros(20)\nfor k,f in funcs.items():\n    for i in range(1, len(t) + 1):\n        torch.set_num_threads(i)\n        c = 0;\n        t0 = time.time();\n        for _ in range(100):\n            f(A,B)\n        tf = time.time()\n        t[i-1] = (tf - t0)*i;\n    plt.plot(np.arange(1, len(t)+1), t, \'-o\', label=k)\nplt.xlabel(\'Number of threads\')\nplt.legend()\nplt.ylabel(\'Core x time\')\n
Run Code Online (Sandbox Code Playgroud)\n

通过并行性,操作往往运行得更快\n在此输入图像描述

\n

但如果我们将总 CPU 时间乘以线程数,我们会发现单线程版本效率更高。

\n

在此输入图像描述

\n

如果您能够通过运行独立进程在更高级别上并行化您的实验,您应该尝试为每个进程使用单个核心,否则每个进程将尝试使用所有 CPU,并且所有进程都将运行非常慢,因为您的系统超载。

\n

调整 DDP 示例

\n

我故意修改了示例脚本的超参数,以支持多进程。

\n
    \n
  • 初始化开销相对较少
  • \n
  • 进程之间的通信相对较少
  • \n
\n
"""\nBased on: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html\n\nNote: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use\ndifferent communication backends and are not restricted to being executed on the same machine.\n"""\nimport torch\nfrom torch import nn, optim\nimport torch.distributed as dist\nimport torch.multiprocessing as mp\nfrom torch.nn.parallel import DistributedDataParallel as DDP\nimport argparse\nimport os\n\n# More than one epoch so that the initialization is less significant\n# than compared to the model processing time\nnum_epochs = 10\n# for the experiment select a number that has a lot of divisors\n# as I want to test with equal number of batches\nnum_batches = 16*9*5\n# Uses a larger batch so that more work is done in each process\n# between two gradient synchronizations\n# apparently the intraop optimization is not helping \n# (at least not too much) in the batch dimension\nbatch_size = 10000\n# Use smaller dimensions, so that the intraop parallelization becomes less \n# helpful\nDin, Dout = 3, 5\ndata_x = torch.randn(batch_size, Din)\ndata_y = torch.randn(batch_size, Dout)\ndata = [(i*data_x, i*data_y) for i in range(num_batches)]\n\nclass OneDeviceModel(nn.Module):\n    """\n    Toy example for a model ran in parallel but not distributed accross gpus\n    (only processes with their own gpu or hardware)\n    """\n    def __init__(self):\n        super().__init__()\n        # -- Use more layers\n        self.net = [nn.Linear(Din, Din) for _ in range(10)]\n        # -- Bob: use more complex activation  \n        self.tanh = nn.Tanh()\n        self.sigmoid = nn.Sigmoid()\n        self.relu = nn.ReLU()\n        self.net2 = nn.Linear(Din, Dout)\n\n    def forward(self, x):\n      # apply the 10 layers sequentially\n      for i in range(10):\n        x = self.net[i](x)\n        x = self.sigmoid(x)\n        x = self.tanh(x)\n        x = self.relu(x)\n      return self.net2(x)\n\ndef setup_process(rank, world_size, backend=\'gloo\'):\n    """\n    Initialize the distributed environment (for each process).\n\n    gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that\n    it\'s a library/API for process to communicate/coordinate with each other/master. It\'s a backend library.\n    """\n    # set up the master\'s ip address so this child process can coordinate\n    # os.environ[\'MASTER_ADDR\'] = \'127.0.0.1\'\n    os.environ[\'MASTER_ADDR\'] = \'localhost\'\n    os.environ[\'MASTER_PORT\'] = \'12355\'\n\n    # - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends\n    if torch.cuda.is_available():\n        backend = \'nccl\'\n    # Initializes the default distributed process group, and this will also initialize the distributed package.\n    dist.init_process_group(backend, rank=rank, world_size=world_size)\n\ndef cleanup():\n    """ Destroy a given process group, and deinitialize the distributed package """\n    dist.destroy_process_group()\n\ndef run_parallel_training_loop(rank, world_size):\n    """\n    Distributed function to be implemented later.\n\n    This is the function that is actually ran in each distributed process.\n\n    Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,\n    you don\xe2\x80\x99t need to worry about different DDP processes start from different model parameter initial values.\n    """\n    print()\n    print(f"Start running DDP with model parallel example on rank: {rank}.")\n    print(f\'current process: {mp.current_process()}\')\n    print(f\'pid: {os.getpid()}\')\n    setup_process(rank, world_size)\n    torch.set_num_threads(mp.cpu_count() // world_size)\n    # create model and move it to GPU with id rank\n    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()\n    # ddp_model = DDP(model, device_ids=[rank])\n    ddp_model = DDP(model)\n    for _ in range(num_epochs):\n      for batch_idx, batch in enumerate(data[rank::world_size]):\n          x, y = batch\n          loss_fn = nn.MSELoss()\n          optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)\n\n          optimizer.zero_grad()\n          outputs = ddp_model(x)\n          labels = y.to(rank) if torch.cuda.is_available() else y\n          # Gradient synchronization communications take place during the backward pass and overlap with the backward computation.\n          loss_fn(outputs, labels).backward()  # When the backward() returns, param.grad already contains the synchronized gradient tensor.\n          optimizer.step()  # TODO how does the optimizer know to do the gradient step only once?\n\n    print()\n    print(f"Start running DDP with model parallel example on rank: {rank}.")\n    print(f\'current process: {mp.current_process()}\')\n    print(f\'pid: {os.getpid()}\')\n    # Destroy a given process group, and deinitialize the distributed package\n    cleanup()\n\ndef main():\n    print()\n    print(\'running main()\')\n    print(f\'current process: {mp.current_process()}\')\n    print(f\'pid: {os.getpid()}\')\n    parser = argparse.ArgumentParser()\n    parser.add_argument(\'--world-size\', default=1, type=int)\n    args = parser.parse_args()\n    assert num_batches % args.world_size == 0\n    mp.spawn(run_parallel_training_loop, args=(args.world_size,), nprocs=args.world_size)\n\nif __name__ == "__main__":\n    print(\'starting __main__\')\n    main()\n    print(\'Done!\\a\\n\')\n
Run Code Online (Sandbox Code Playgroud)\n
$ time python3 ddp.py --world-size 1 > /dev/null\n\nreal    0m59.092s\nuser    8m46.589s\nsys     0m7.320s\n\n$ time python3 ddp.py --world-size 1 > /dev/null\n\nreal    1m11.124s\nuser    10m54.209s\nsys     0m9.595s\n\n$ time python3 ddp.py --world-size 6 > /dev/null\n\nreal    0m18.348s\nuser    2m28.799s\nsys     0m18.068s\n$ time python3 ddp.py --world-size 12 > /dev/null\n\nreal    0m26.352s\nuser    4m3.074s\nsys     0m39.179s\n$ time python3 ddp.py --world-size 3 > /dev/null\n\nreal    0m23.047s\nuser    3m51.172s\nsys     0m11.483s\n$ time python3 ddp.py --world-size 4 > /dev/null\n\nreal    0m18.195s\nuser    2m55.241s\nsys     0m12.841s\n$ time python3 ddp.py --world-size 2 > /dev/null\n\nreal    0m26.955s\nuser    4m15.837s\nsys     0m7.127s\n
Run Code Online (Sandbox Code Playgroud)\n

如果我删除该行

\n
"""\nBased on: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html\n\nNote: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use\ndifferent communication backends and are not restricted to being executed on the same machine.\n"""\nimport torch\nfrom torch import nn, optim\nimport torch.distributed as dist\nimport torch.multiprocessing as mp\nfrom torch.nn.parallel import DistributedDataParallel as DDP\nimport argparse\nimport os\n\n# More than one epoch so that the initialization is less significant\n# than compared to the model processing time\nnum_epochs = 10\n# for the experiment select a number that has a lot of divisors\n# as I want to test with equal number of batches\nnum_batches = 16*9*5\n# Uses a larger batch so that more work is done in each process\n# between two gradient synchronizations\n# apparently the intraop optimization is not helping \n# (at least not too much) in the batch dimension\nbatch_size = 10000\n# Use smaller dimensions, so that the intraop parallelization becomes less \n# helpful\nDin, Dout = 3, 5\ndata_x = torch.randn(batch_size, Din)\ndata_y = torch.randn(batch_size, Dout)\ndata = [(i*data_x, i*data_y) for i in range(num_batches)]\n\nclass OneDeviceModel(nn.Module):\n    """\n    Toy example for a model ran in parallel but not distributed accross gpus\n    (only processes with their own gpu or hardware)\n    """\n    def __init__(self):\n        super().__init__()\n        # -- Use more layers\n        self.net = [nn.Linear(Din, Din) for _ in range(10)]\n        # -- Bob: use more complex activation  \n        self.tanh = nn.Tanh()\n        self.sigmoid = nn.Sigmoid()\n        self.relu = nn.ReLU()\n        self.net2 = nn.Linear(Din, Dout)\n\n    def forward(self, x):\n      # apply the 10 layers sequentially\n      for i in range(10):\n        x = self.net[i](x)\n        x = self.sigmoid(x)\n        x = self.tanh(x)\n        x = self.relu(x)\n      return self.net2(x)\n\ndef setup_process(rank, world_size, backend=\'gloo\'):\n    """\n    Initialize the distributed environment (for each process).\n\n    gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that\n    it\'s a library/API for process to communicate/coordinate with each other/master. It\'s a backend library.\n    """\n    # set up the master\'s ip address so this child process can coordinate\n    # os.environ[\'MASTER_ADDR\'] = \'127.0.0.1\'\n    os.environ[\'MASTER_ADDR\'] = \'localhost\'\n    os.environ[\'MASTER_PORT\'] = \'12355\'\n\n    # - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends\n    if torch.cuda.is_available():\n        backend = \'nccl\'\n    # Initializes the default distributed process group, and this will also initialize the distributed package.\n    dist.init_process_group(backend, rank=rank, world_size=world_size)\n\ndef cleanup():\n    """ Destroy a given process group, and deinitialize the distributed package """\n    dist.destroy_process_group()\n\ndef run_parallel_training_loop(rank, world_size):\n    """\n    Distributed function to be implemented later.\n\n    This is the function that is actually ran in each distributed process.\n\n    Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,\n    you don\xe2\x80\x99t need to worry about different DDP processes start from different model parameter initial values.\n    """\n    print()\n    print(f"Start running DDP with model parallel example on rank: {rank}.")\n    print(f\'current process: {mp.current_process()}\')\n    print(f\'pid: {os.getpid()}\')\n    setup_process(rank, world_size)\n    torch.set_num_threads(mp.cpu_count() // world_size)\n    # create model and move it to GPU with id rank\n    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()\n    # ddp_model = DDP(model, device_ids=[rank])\n    ddp_model = DDP(model)\n    for _ in range(num_epochs):\n      for batch_idx, batch in enumerate(data[rank::world_size]):\n          x, y = batch\n          loss_fn = nn.MSELoss()\n          optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)\n\n          optimizer.zero_grad()\n          outputs = ddp_model(x)\n          labels = y.to(rank) if torch.cuda.is_available() else y\n          # Gradient synchronization communications take place during the backward pass and overlap with the backward computation.\n          loss_fn(outputs, labels).backward()  # When the backward() returns, param.grad already contains the synchronized gradient tensor.\n          optimizer.step()  # TODO how does the optimizer know to do the gradient step only once?\n\n    print()\n    print(f"Start running DDP with model parallel example on rank: {rank}.")\n    print(f\'current process: {mp.current_process()}\')\n    print(f\'pid: {os.getpid()}\')\n    # Destroy a given process group, and deinitialize the distributed package\n    cleanup()\n\ndef main():\n    print()\n    print(\'running main()\')\n    print(f\'current process: {mp.current_process()}\')\n    print(f\'pid: {os.getpid()}\')\n    parser = argparse.ArgumentParser()\n    parser.add_argument(\'--world-size\', default=1, type=int)\n    args = parser.parse_args()\n    assert num_batches % args.world_size == 0\n    mp.spawn(run_parallel_training_loop, args=(args.world_size,), nprocs=args.world_size)\n\nif __name__ == "__main__":\n    print(\'starting __main__\')\n    main()\n    print(\'Done!\\a\\n\')\n
Run Code Online (Sandbox Code Playgroud)\n
$ time python3 ddp.py --world-size 4 > /dev/null\n\nreal    0m40.574s\nuser    6m39.176s\nsys     0m19.025s\n\n$ time python3 ddp.py --world-size 2 > /dev/null\n\nreal    0m28.066s\nuser    3m17.775s\nsys     0m8.410s\n\n$ time python3 ddp.py --world-size 1 > /dev/null\n\nreal    0m37.114s\nuser    2m19.743s\nsys     0m4.866s\n
Run Code Online (Sandbox Code Playgroud)\n

使用

\n
$ time python3 ddp.py --world-size 1 > /dev/null\n\nreal    0m59.092s\nuser    8m46.589s\nsys     0m7.320s\n\n$ time python3 ddp.py --world-size 1 > /dev/null\n\nreal    1m11.124s\nuser    10m54.209s\nsys     0m9.595s\n\n$ time python3 ddp.py --world-size 6 > /dev/null\n\nreal    0m18.348s\nuser    2m28.799s\nsys     0m18.068s\n$ time python3 ddp.py --world-size 12 > /dev/null\n\nreal    0m26.352s\nuser    4m3.074s\nsys     0m39.179s\n$ time python3 ddp.py --world-size 3 > /dev/null\n\nreal    0m23.047s\nuser    3m51.172s\nsys     0m11.483s\n$ time python3 ddp.py --world-size 4 > /dev/null\n\nreal    0m18.195s\nuser    2m55.241s\nsys     0m12.841s\n$ time python3 ddp.py --world-size 2 > /dev/null\n\nreal    0m26.955s\nuser    4m15.837s\nsys     0m7.127s\n
Run Code Online (Sandbox Code Playgroud)\n
torch.set_num_threads(mp.cpu_count() // world_size)\n
Run Code Online (Sandbox Code Playgroud)\n

我的想法

\n

DDP在单个节点上似乎并不是特别有利。除非你有一个模型,它做了很多工作,特别是 pytorch 操作内并行性处理得不好,有大批量,并且最好是具有较少参数和更多操作的模型,这意味着要同步的梯度较少,例如非常大的卷积模型输入。

\n

DDP 可能有用的其他场景是,如果您在模型中使用太多 Python,而不是矢量化操作。

\n