DDP (distributed data parallel)¶
Scope: PyTorch DistributedDataParallel, which replicates the model on every GPU, shards the data, and all-reduces gradients each step. The simplest, fastest data-parallel strategy when the model fits one GPU.
Reference templates use real APIs (torch, NCCL, torchrun): pin versions and validate before production use. The
torchsnippets below need CUDA GPUs and are not executed here; each core algorithm they teach is mirrored by a runnable, asserted numpy block (run withpython3) that validates the underlying math.
What it is¶
DDP runs an identical replica of the model on every rank. Each rank processes a different shard of the global batch (distinct data), runs forward and backward independently, then all-reduces the gradients across all ranks so every replica applies the same averaged update and stays in lockstep. Parameters and optimizer state are not sharded: each GPU holds a full copy (contrast FSDP, which shards them, FSDP). Communication is one all-reduce of the gradients per step, overlapped with the backward pass.
The API is torch.nn.parallel.DistributedDataParallel, a module wrapper that registers autograd hooks to trigger the gradient all-reduce. It is the baseline data-parallel axis in distributed training and the building block that HSDP replicates across nodes (FSDP).
Why use it¶
- Simplest and fastest when the model fits. One collective per step (gradient all-reduce), fully overlapped with backward: minimal communication, near-linear scaling on a good fabric. Nothing to shard, nothing to gather.
- Lower comm than FSDP in the fits-in-memory regime: DDP does a single all-reduce; FSDP does all-gather + reduce-scatter every layer.
- Native, batteries-included.
DistributedDataParallel+torchrun+DistributedSampleris the canonical PyTorch multi-GPU recipe; every higher-level trainer builds on it.
When to use it (and when not)¶
- Use DDP when one full copy of the model + optimizer state + a microbatch of activations fits on a single GPU. That covers most models up to a few billion parameters, and larger ones with activation checkpointing.
- Switch to FSDP (FSDP) the moment a full replica no longer fits: DDP cannot help there because it never shards parameters or optimizer state.
- Combine, do not replace. Under HSDP, DDP-style replication is the inter-node dimension over FSDP shards; under TP (tensor parallelism) or PP (pipeline parallelism), DDP is the outer data-parallel wrapper around model-parallel groups.
Architecture¶
flowchart TB
subgraph G0["GPU 0"]
R0["Full model replica"]
B0["Backward grads"]
end
subgraph G1["GPU 1"]
R1["Full model replica"]
B1["Backward grads"]
end
subgraph G2["GPU N"]
R2["Full model replica"]
B2["Backward grads"]
end
R0 --> B0
R1 --> B1
R2 --> B2
B0 ==>|"all-reduce grads"| AR["Averaged gradient"]
B1 ==>|"all-reduce grads"| AR
B2 ==>|"all-reduce grads"| AR
AR -.->|"identical update"| R0
How to use it¶
Initialize the process group, wrap the model, and shard the data with DistributedSampler. torchrun sets RANK/LOCAL_RANK/WORLD_SIZE.
# Reference template: torch + NCCL + CUDA GPUs (not executed here).
# torchrun --standalone --nproc_per_node=8 train.py
import os, torch, torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
dist.init_process_group("nccl") # one process per GPU
local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_rank)
model = Model().to(local_rank)
model = DDP(model, device_ids=[local_rank]) # registers grad all-reduce hooks
sampler = DistributedSampler(dataset) # disjoint shard per rank
loader = DataLoader(dataset, batch_size=32, sampler=sampler)
# Reference template: torch + NCCL + CUDA GPUs (not executed here).
opt = torch.optim.AdamW(model.parameters(), lr=2e-5)
for epoch in range(epochs):
sampler.set_epoch(epoch) # reshuffle deterministically
for batch in loader:
loss = model(**batch).loss
loss.backward() # all-reduce fires here
opt.step(); opt.zero_grad()
dist.destroy_process_group()
Validated (numpy, runnable). DDP's correctness rests on one identity: averaging the per-rank gradients (each computed on an equal shard) equals the gradient of the mean loss over the whole global batch, so every replica applies the same update and stays in lockstep. The block below proves that equivalence against a single-GPU reference, confirms lockstep, and catches two real failure modes: unequal shards (which corrupt the average) and a skipped all-reduce (which lets replicas diverge).
import numpy as np
rng = np.random.default_rng(0)
D, N, R = 4, 12, 3 # 4 features, 12 samples, 3 ranks (12 / 3 = 4 per shard)
X = rng.standard_normal((N, D))
y = rng.standard_normal(N)
w = rng.standard_normal(D)
def mse_grad(Xb, yb, w):
# gradient of mean((Xb @ w - yb) ** 2) wrt w
err = Xb @ w - yb
return (2.0 / len(yb)) * (Xb.T @ err)
# Reference: the full-batch gradient (single-GPU ground truth).
g_full = mse_grad(X, y, w)
# DDP: each rank computes its local-mean gradient on an equal shard,
# then all-reduce AVERAGES the per-rank gradients.
shards = np.array_split(np.arange(N), R)
assert all(len(s) == N // R for s in shards) # equal-shard precondition
g_ranks = [mse_grad(X[s], y[s], w) for s in shards]
g_ddp = np.mean(g_ranks, axis=0) # all-reduce(mean)
# Equivalence to the slow single-GPU reference (the DDP correctness guarantee).
assert np.allclose(g_ddp, g_full, atol=1e-12), (g_ddp, g_full)
# Lockstep: identical start + identical averaged grad => identical replicas.
lr = 0.1
replicas = [w.copy() - lr * g_ddp for _ in range(R)]
for wi in replicas:
assert np.array_equal(wi, replicas[0]) # every replica bit-identical
# Adversarial 1: unequal shards break the average (why drop_last / padding matters).
bad = [np.arange(0, 2), np.arange(2, 4), np.arange(4, N)] # sizes 2, 2, 8
g_bad = np.mean([mse_grad(X[s], y[s], w) for s in bad], axis=0)
assert not np.allclose(g_bad, g_full, atol=1e-6) # uneven shards => wrong gradient
# Adversarial 2: skipping the all-reduce diverges the replicas.
no_sync = [w - lr * g_ranks[r] for r in range(R)]
assert not np.array_equal(no_sync[0], no_sync[1]) # each rank drifts on its own shard
print("block1 OK: g_ddp==g_full, replicas in lockstep, unequal-shard + no-sync caught")
How to integrate with it¶
DDP integrates at two levels: the low-level training-loop knobs it exposes, and the higher-level trainers that wrap it.
Training-loop knobs¶
- Gradient bucketing. DDP groups gradients into buckets (default ~25 MB) and launches an all-reduce per bucket as soon as it fills during backward, overlapping communication with the remaining backward compute. Tune
bucket_cap_mbto balance overlap (smaller buckets start sooner) against collective efficiency (larger buckets amortize launch cost).
Validated (numpy, runnable). Bucketing must be transparent: reducing per bucket and stitching the results back in parameter order has to equal one flat all-reduce over all gradients, for any bucket size. The block proves that across several caps and shows that a single unreduced bucket (a genuinely dropped collective, a correctness bug distinct from a GDR-to-TCP fallback, which still all-reduces correctly, only slower) is detectable.
import numpy as np
rng = np.random.default_rng(2)
R = 4 # ranks
# Per-parameter gradients, one list per rank. Shapes mimic layers of varying size.
shapes = [(3,), (2, 4), (5,), (1,), (3, 3)]
grads = [[rng.standard_normal(s) for s in shapes] for _ in range(R)]
def flat_allreduce(grads):
# ground truth: average the fully-flattened gradient across ranks.
flats = [np.concatenate([g.ravel() for g in rank]) for rank in grads]
return np.mean(flats, axis=0)
def bucketed_allreduce(grads, cap):
# group consecutive params into buckets under `cap` elements, reduce per bucket.
nparam = len(grads[0])
buckets, cur, cur_sz = [], [], 0
for p in range(nparam):
sz = grads[0][p].size
if cur and cur_sz + sz > cap:
buckets.append(cur)
cur, cur_sz = [], 0
cur.append(p)
cur_sz += sz
if cur:
buckets.append(cur)
out = [None] * nparam
for b in buckets: # reduce each bucket, then stitch back in order.
stacked = [np.concatenate([grads[r][p].ravel() for p in b]) for r in range(R)]
red = np.mean(stacked, axis=0)
off = 0
for p in b:
n = grads[0][p].size
out[p] = red[off:off + n]
off += n
return np.concatenate(out)
ref = flat_allreduce(grads)
# Any bucket cap must reconstruct the identical reduction (bucketing is transparent).
for cap in (1, 4, 7, 1000):
got = bucketed_allreduce(grads, cap)
assert np.allclose(got, ref, atol=1e-12), cap
# Adversarial: a bucket that never gets reduced (comm dropped) => wrong average.
broken = bucketed_allreduce(grads, cap=4).copy()
n0 = grads[0][0].size
broken[:n0] = grads[0][0].ravel() # rank-0 only, unreduced
assert not np.allclose(broken, ref, atol=1e-6) # a missing reduction is detectable
print("block3 OK: bucketed all-reduce == flat all-reduce; dropped bucket caught")
- Gradient accumulation with
no_sync. To accumulate over K microbatches, skip the all-reduce on the first K-1 (compute-only), then sync on the K-th; this saves K-1 collectives per effective step.
# Reference template: torch + NCCL + CUDA GPUs (not executed here).
accum = 4
for i, batch in enumerate(loader):
sync = (i % accum == accum - 1)
ctx = model.no_sync() if not sync else contextlib.nullcontext()
with ctx:
loss = model(**batch).loss / accum
loss.backward() # local accumulate; no all-reduce
if sync:
opt.step(); opt.zero_grad() # all-reduce happened on this step
Validated (numpy, runnable). Accumulating K microbatch gradients, each scaled by 1/K, then applying one step must equal a single backward over the concatenated K-microbatch batch. The block proves that equivalence and catches the classic bug: dropping the 1/accum scaling inflates the gradient exactly K-fold.
import numpy as np
rng = np.random.default_rng(1)
D, K, m = 5, 4, 3 # D features, K microbatches, m samples each
X = rng.standard_normal((K * m, D))
y = rng.standard_normal(K * m)
w = rng.standard_normal(D)
def mse_grad(Xb, yb, w):
err = Xb @ w - yb
return (2.0 / len(yb)) * (Xb.T @ err)
# Reference: one backward over the full (K*m) effective batch.
g_ref = mse_grad(X, y, w)
# no_sync accumulation: K equal microbatches, each scaled by 1/K, summed locally;
# a single all-reduce fires on the last microbatch (one rank here => identity).
micro = np.array_split(np.arange(K * m), K)
assert all(len(s) == m for s in micro)
g_accum = sum(mse_grad(X[s], y[s], w) / K for s in micro)
assert np.allclose(g_accum, g_ref, atol=1e-12), (g_accum, g_ref)
# Adversarial: forgetting the /K scaling inflates the gradient K-fold.
g_noscale = sum(mse_grad(X[s], y[s], w) for s in micro)
assert not np.allclose(g_noscale, g_ref, atol=1e-6)
assert np.allclose(g_noscale, K * g_ref, atol=1e-12) # exact corruption signature
print("block2 OK: no_sync accumulation == full-batch grad; missing /K caught (Kx)")
find_unused_parameters=Truelets DDP handle graphs where some parameters get no gradient (e.g. conditional branches), at a traversal cost; leave itFalseunless you hit the "parameter did not receive grad" error, since it is a real throughput hit.static_graph=Trueis an optimization when the graph is identical every iteration (enables better bucketing/checkpointing); only set it if that invariant truly holds.
Higher-level trainers¶
Every mainstream trainer builds on this same DistributedDataParallel + torchrun core, so integration is mostly a matter of letting the framework own the wrapper:
- Hugging Face
Trainer/accelerate. Launch withtorchrunoraccelerate launch; the trainer wraps the model in DDP automatically and surfaces the same knobs asddp_find_unused_parametersandddp_bucket_cap_mb. - PyTorch Lightning.
Trainer(strategy="ddp", devices=8)performs the process-group init, the DDP wrapping, and theDistributedSamplerinjection for you. - RL and SFT/LoRA stacks (RL libraries, SFT and LoRA) run DDP- or FSDP-backed trainers under the hood: you select the backend, they own the collective.
How to run it in production¶
Production DDP is a launch-orchestration and fabric problem: the training code is unchanged, and what matters is how ranks are started, how the all-reduce reaches the wire, and how the job survives a failed rank.
Launch orchestration (Slurm)¶
#!/bin/bash
#SBATCH --nodes=4
#SBATCH --ntasks-per-node=1
#SBATCH --gpus-per-node=8
#SBATCH --exclusive
export MASTER_ADDR=$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n1)
srun torchrun --nnodes=$SLURM_NNODES --nproc_per_node=8 \
--rdzv_backend=c10d --rdzv_endpoint="$MASTER_ADDR:29500" \
--rdzv_id=$SLURM_JOB_ID train.py
The Slurm/torchrun launch patterns live in Slurm; end-to-end recipes in distributed-training recipes.
Hardware and fabric¶
The single per-step all-reduce is the only collective, so the fabric is the whole game:
- NVLink / NVSwitch carries the intra-node portion of the all-reduce: NCCL builds a ring/tree that uses NVLink within a node before crossing the NIC (performance tuning).
- InfiniBand / RoCE + GPUDirect RDMA carries the inter-node all-reduce. Set
NCCL_IB_HCAandNCCL_NET_GDR_LEVEL=SYS; verify GDR viaNCCL_DEBUG=INFOshowing[GDRDMA](networking fabric). NCCL_NVLS_ENABLE=1enables NVLink SHARP in-switch reduction for the all-reduce on NVSwitch systems.- Bucket size (
bucket_cap_mb) trades comm/compute overlap against per-collective efficiency, the main DDP-specific knob. - PCIe ACS off for P2P/GDR, or NCCL silently falls back to TCP and all-reduce throughput collapses.
Checkpointing and fault tolerance¶
- Checkpoint from one rank. Because every replica is identical, save model and optimizer state from rank 0 only (guard with
if dist.get_rank() == 0) and wrap save/load indist.barrier()so no rank races ahead. On resume, load withmap_locationonto each rank's local device and restore the optimizer state, theDistributedSamplerepoch, and the RNG so the run continues deterministically. - Elastic restarts.
torchrunis TorchElastic: with--rdzv_backend=c10dand--max-restarts=Nit re-forms the process group and resumes from the last checkpoint after a rank dies, instead of forfeiting the whole job. - Fail loud, not hung. Set
TORCH_NCCL_ASYNC_ERROR_HANDLING=1and pass atimeouttoinit_process_groupso a dead or straggling rank aborts the collective with an error instead of hanging the world indefinitely (see the NCCL-hang runbook).
How to maintain it¶
A DDP job is only as healthy as its weakest rank; maintenance is about keeping all ranks identical, observable, and in lockstep over time.
- Pin versions across every node. All ranks must run the same PyTorch, CUDA, and NCCL build; a mismatched NCCL or torch across hosts causes silent hangs or divergent numerics. Upgrade the whole job as one coordinated redeploy, never a rolling per-node bump.
- Keep replicas identical. DDP broadcasts the initial module state from rank 0 at construction so every replica starts bit-identical; seed all ranks the same, keep
broadcast_buffers=True(the default) so buffers like BatchNorm statistics stay in sync, and callsampler.set_epoch(epoch)each epoch so shards reshuffle without repeating. - Watch for stragglers. The all-reduce is a barrier, so the slowest rank sets the step time. Track per-step all-reduce/wait time and per-GPU utilization: a single slow NIC, a thermal-throttling GPU, or a noisy neighbor stalls the whole world. Log metrics from rank 0 to avoid duplicating telemetry from every rank.
- Verify the fabric after every change. Re-check
NCCL_DEBUG=INFOfor[GDRDMA]and PCIe ACS state after any driver, firmware, or topology change, since a regression here silently drops the all-reduce onto TCP and throughput collapses.
How to scale it¶
DDP scales to multiple nodes by widening the world with torchrun; the code is unchanged, since only the launcher and the fabric matter once the per-step gradient all-reduce starts crossing the inter-node network.
# run on every node; rendezvous coordinates ranks across hosts.
torchrun \
--nnodes=4 --nproc_per_node=8 \
--rdzv_backend=c10d --rdzv_endpoint="$MASTER_ADDR:29500" \
--rdzv_id=ddp-job train.py
Multi-node DDP is all-reduce-bound: the gradient all-reduce traverses InfiniBand/RoCE every step, so fabric bandwidth and NCCL config (see production above) gate scaling efficiency. When a full replica stops fitting, move to FSDP/HSDP (FSDP).
Inference¶
Not applicable. DDP is a training-only construct: it exists to synchronize gradients during backward, which serving does not do. For inference, replicate the model behind a load balancer (data-parallel serving) or shard it with tensor/pipeline parallel inside an inference engine (vLLM/SGLang). See inference serving and disaggregated inference.
Fine-tuning¶
DDP is the right backend for fits-in-memory full fine-tuning: when a full copy of the model plus its optimizer state fits on one GPU, DDP gives the simplest, fastest path (fine-tuning and post-training). It also pairs with LoRA when the (frozen base + adapters) replica fits per GPU, though large bases typically need FSDP sharding instead (FSDP/SFT and LoRA). RL post-training uses DDP- or FSDP-backed trainers behind the rollout engine (RL libraries).
Cookbook (common use cases)¶
1. Single-node DDP (8 GPUs)
# torchrun spawns one process per GPU; LOCAL_RANK selects the device.
torchrun --standalone --nproc_per_node=8 train.py
# Reference template: torch + NCCL + CUDA GPUs (not executed here).
dist.init_process_group("nccl")
torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
model = DDP(Model().cuda(), device_ids=[int(os.environ["LOCAL_RANK"])])
2. Multi-node DDP (4 nodes x 8 GPUs)
# identical command on every node; c10d rendezvous joins them.
torchrun --nnodes=4 --nproc_per_node=8 \
--rdzv_backend=c10d --rdzv_endpoint="$MASTER_ADDR:29500" \
--rdzv_id=run42 train.py
3. Gradient accumulation with no_sync (large effective batch)
# Reference template: torch + NCCL + CUDA GPUs (not executed here).
import contextlib
accum = 8
for i, batch in enumerate(loader):
sync = (i + 1) % accum == 0
with (contextlib.nullcontext() if sync else model.no_sync()):
(model(**batch).loss / accum).backward() # all-reduce only when sync
if sync:
opt.step(); opt.zero_grad()
Failure modes¶
Several of these are reproduced in miniature by the validated numpy blocks above: unequal shards and a skipped all-reduce (in "How to use it"), the missing 1/accum scale (in "How to integrate with it"), and a dropped reduction bucket.
- Forgetting
sampler.set_epoch(epoch): every epoch sees the same shuffle, hurting convergence. find_unused_parameters=Trueleft on by default: measurable throughput loss; only enable it when a parameter genuinely receives no gradient.- All-reduce on every microbatch during grad-accum: wasted comm; wrap the non-sync steps in
model.no_sync(). - Mismatched replicas (different model init / dropout RNG / buffers across ranks): silent divergence; seed consistently and let DDP broadcast initial state.
- Replica no longer fits one GPU: OOM that DDP cannot fix; this is the signal to move to FSDP (FSDP).
- ACS on / wrong
NCCL_IB_HCA: NCCL off GDR, all-reduce on TCP, training crawls; verify[GDRDMA](the NCCL-hang runbook). - Uneven shard sizes across ranks (dataset not divisible by world size): a rank runs out of data and the collective hangs; use
drop_last=Trueor pad.
References¶
- PyTorch DDP notes: https://docs.pytorch.org/docs/stable/notes/ddp.html
DistributedDataParallelAPI: https://docs.pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html- DDP getting-started tutorial: https://docs.pytorch.org/tutorials/intermediate/ddp_tutorial.html
- "PyTorch Distributed: Experiences on Accelerating Data Parallel Training" (arXiv 2006.15704): https://arxiv.org/abs/2006.15704
- NCCL env vars: https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/env.html
Related: Distributed training · FSDP · DeepSpeed/ZeRO · Tensor parallel · Pipeline parallel · Training recipes · Slurm · Performance · Glossary