Skip to main content

Using Distributed Computing in PyTorch

Test Environment

Verified on Linux with CUDNN installed; not sure about Windows.

API Used

We use the Distributed RPC Framework library within torch.distributed. The backend for torch.distributed can be selected from three options at runtime: gloo, mpi, and nccl. Different backends support different tensor types (CPU, GPU) depending on the function. Specifically, it is recommended to use Gloo for CPU and nccl for GPU. However, note that when using nccl, CPU tensors cannot be handled, and some methods become unavailable.

For details on which methods become unavailable, refer to the official documentation.

The PyTorch documentation states:

Use multi-machine DistributedDataParallel and the launching script, if the application needs to scale across machine boundaries.

At first glance, it seems like RPC is not needed and this would suffice. However, data-parallel APIs such as DistributedDataParallel are more geared toward supervised learning. For example:

In reinforcement learning, it might be relatively expensive to acquire training data from environments while the model itself can be quite small. In this case, it might be useful to spawn multiple observers running in parallel and share a single agent. In this case, the agent takes care of the training locally, but the application would still need libraries to send and receive data between observers and the trainer.

It cannot handle SyncOnPolicy situations like the above. RPC is recommended as a more general-purpose API that can handle such scenarios.

https://pytorch.org/tutorials/beginner/dist_overview.html#general-distributed-training

Note

The term RPC can refer to any of the following depending on the context in the official documentation. Be careful not to confuse them.

  1. Distributed RPC Framework
    • A library within torch.distributed.
  2. The RPC feature within the above framework
  3. Remote procedure calls in general

Distributed RPC Framework

The API within torch.distributed that is primarily used for reinforcement learning. It is not supported on Windows (partial support starts from the version after next).

Tutorial on building a Parameter Server using RPC

  1. Remote Procedure Call (RPC)
    • An API for calling remote functions with specified arguments and receiving return values
      • The caller blocks until the return value is received (rpc_sync)
      • The caller waits for the return value as needed (rpc_async)
      • The caller waits for a reference to the return value as needed (remote)
  2. Remote Reference (RRef)
    • A reference to an object on another machine
  3. Distributed Autograd
    • Provides functionality for sending gradients between multiple machines
    • An API that tracks the flow of gradients across multiple machines and computes them
  4. Distributed Optimizer
    • An API that updates workers on each machine using gradients computed by Distributed Autograd

Distributed Autograd

Tracks the computation graph of data received from remote machines and can compute gradients and backpropagate with respect to the weights on each machine.

  • Usage example
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>> t1 = torch.rand((3, 3), requires_grad=True)
>>> t2 = torch.rand((3, 3), requires_grad=True)
>>> loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum()
>>> dist_autograd.backward(context_id, [loss])

Distributed Optimizer

https://pytorch.org/docs/master/rpc.html#module-torch.distributed.optim Computes gradients for both remote and local parameters (references = rrefs) that are passed in. Example:

import torch
import torch.multiprocessing as mp
import torch.distributed.autograd as dist_autograd
from torch.distributed import rpc
from torch import optim
from torch.distributed.optim import DistributedOptimizer

def random_tensor():
return torch.rand((3, 3), requires_grad=True)

def _run_process(rank, dst_rank, world_size):
name = "worker{}".format(rank)
dst_name = "worker{}".format(dst_rank)

# Initialize RPC.
rpc.init_rpc(
name=name,
rank=rank,
world_size=world_size
)

# Use a distributed autograd context.
with dist_autograd.context() as context_id:
# Forward pass (create references on remote nodes).
rref1 = rpc.remote(dst_name, random_tensor)
rref2 = rpc.remote(dst_name, random_tensor)
loss = rref1.to_here() + rref2.to_here()

# Backward pass (run distributed autograd).
dist_autograd.backward(context_id, [loss.sum()])

# Build DistributedOptimizer.
dist_optim = DistributedOptimizer(
optim.SGD,
[rref1, rref2],
lr=0.05,
)

# Run the distributed optimizer step.
dist_optim.step(context_id)

def run_process(rank, world_size):
dst_rank = (rank + 1) % world_size
_run_process(rank, dst_rank, world_size)
rpc.shutdown()

if __name__ == '__main__':
# Run world_size workers
world_size = 2
mp.spawn(run_process, args=(world_size,), nprocs=world_size)

Usage Examples

SyncOnPolicy

https://pytorch.org/tutorials/intermediate/rpc_tutorial.html#distributed-reinforcement-learning-using-rpc-and-rref

AsyncOnPolicy (ParameterServer)

https://pytorch.org/tutorials/intermediate/rpc_param_server_tutorial.html