Skip to content
Markdown

DDP (distributed data parallel)

Scope: PyTorch DistributedDataParallel, which replicates the model on every GPU, shards the data, and all-reduces gradients each step. The simplest, fastest data-parallel strategy when the model fits one GPU.

Reference templates use real APIs (torch, NCCL, torchrun): pin versions and validate before production use. The torch snippets below need CUDA GPUs and are not executed here; each core algorithm they teach is mirrored by a runnable, asserted numpy block (run with python3) that validates the underlying math.

What it is

DDP runs an identical replica of the model on every rank. Each rank processes a different shard of the global batch (distinct data), runs forward and backward independently, then all-reduces the gradients across all ranks so every replica applies the same averaged update and stays in lockstep. Parameters and optimizer state are not sharded: each GPU holds a full copy (contrast FSDP, which shards them, FSDP). Communication is one all-reduce of the gradients per step, overlapped with the backward pass.

The API is torch.nn.parallel.DistributedDataParallel, a module wrapper that registers autograd hooks to trigger the gradient all-reduce. It is the baseline data-parallel axis in distributed training and the building block that HSDP replicates across nodes (FSDP).

Why use it

  • Simplest and fastest when the model fits. One collective per step (gradient all-reduce), fully overlapped with backward: minimal communication, near-linear scaling on a good fabric. Nothing to shard, nothing to gather.
  • Lower comm than FSDP in the fits-in-memory regime: DDP does a single all-reduce; FSDP does all-gather + reduce-scatter every layer.
  • Native, batteries-included. DistributedDataParallel + torchrun + DistributedSampler is the canonical PyTorch multi-GPU recipe; every higher-level trainer builds on it.

When to use it (and when not)

  • Use DDP when one full copy of the model + optimizer state + a microbatch of activations fits on a single GPU. That covers most models up to a few billion parameters, and larger ones with activation checkpointing.
  • Switch to FSDP (FSDP) the moment a full replica no longer fits: DDP cannot help there because it never shards parameters or optimizer state.
  • Combine, do not replace. Under HSDP, DDP-style replication is the inter-node dimension over FSDP shards; under TP (tensor parallelism) or PP (pipeline parallelism), DDP is the outer data-parallel wrapper around model-parallel groups.

Architecture

flowchart TB
  subgraph G0["GPU 0"]
    R0["Full model replica"]
    B0["Backward grads"]
  end
  subgraph G1["GPU 1"]
    R1["Full model replica"]
    B1["Backward grads"]
  end
  subgraph G2["GPU N"]
    R2["Full model replica"]
    B2["Backward grads"]
  end
  R0 --> B0
  R1 --> B1
  R2 --> B2
  B0 ==>|"all-reduce grads"| AR["Averaged gradient"]
  B1 ==>|"all-reduce grads"| AR
  B2 ==>|"all-reduce grads"| AR
  AR -.->|"identical update"| R0

How to use it

Initialize the process group, wrap the model, and shard the data with DistributedSampler. torchrun sets RANK/LOCAL_RANK/WORLD_SIZE.

# Reference template: torch + NCCL + CUDA GPUs (not executed here).
# torchrun --standalone --nproc_per_node=8 train.py
import os, torch, torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler

dist.init_process_group("nccl")                 # one process per GPU
local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_rank)

model = Model().to(local_rank)
model = DDP(model, device_ids=[local_rank])     # registers grad all-reduce hooks

sampler = DistributedSampler(dataset)           # disjoint shard per rank
loader = DataLoader(dataset, batch_size=32, sampler=sampler)
# Reference template: torch + NCCL + CUDA GPUs (not executed here).
opt = torch.optim.AdamW(model.parameters(), lr=2e-5)
for epoch in range(epochs):
    sampler.set_epoch(epoch)                     # reshuffle deterministically
    for batch in loader:
        loss = model(**batch).loss
        loss.backward()                          # all-reduce fires here
        opt.step(); opt.zero_grad()
dist.destroy_process_group()

Validated (numpy, runnable). DDP's correctness rests on one identity: averaging the per-rank gradients (each computed on an equal shard) equals the gradient of the mean loss over the whole global batch, so every replica applies the same update and stays in lockstep. The block below proves that equivalence against a single-GPU reference, confirms lockstep, and catches two real failure modes: unequal shards (which corrupt the average) and a skipped all-reduce (which lets replicas diverge).

import numpy as np

rng = np.random.default_rng(0)
D, N, R = 4, 12, 3           # 4 features, 12 samples, 3 ranks (12 / 3 = 4 per shard)
X = rng.standard_normal((N, D))
y = rng.standard_normal(N)
w = rng.standard_normal(D)

def mse_grad(Xb, yb, w):
    # gradient of mean((Xb @ w - yb) ** 2) wrt w
    err = Xb @ w - yb
    return (2.0 / len(yb)) * (Xb.T @ err)

# Reference: the full-batch gradient (single-GPU ground truth).
g_full = mse_grad(X, y, w)

# DDP: each rank computes its local-mean gradient on an equal shard,
# then all-reduce AVERAGES the per-rank gradients.
shards = np.array_split(np.arange(N), R)
assert all(len(s) == N // R for s in shards)        # equal-shard precondition
g_ranks = [mse_grad(X[s], y[s], w) for s in shards]
g_ddp = np.mean(g_ranks, axis=0)                    # all-reduce(mean)

# Equivalence to the slow single-GPU reference (the DDP correctness guarantee).
assert np.allclose(g_ddp, g_full, atol=1e-12), (g_ddp, g_full)

# Lockstep: identical start + identical averaged grad => identical replicas.
lr = 0.1
replicas = [w.copy() - lr * g_ddp for _ in range(R)]
for wi in replicas:
    assert np.array_equal(wi, replicas[0])          # every replica bit-identical

# Adversarial 1: unequal shards break the average (why drop_last / padding matters).
bad = [np.arange(0, 2), np.arange(2, 4), np.arange(4, N)]   # sizes 2, 2, 8
g_bad = np.mean([mse_grad(X[s], y[s], w) for s in bad], axis=0)
assert not np.allclose(g_bad, g_full, atol=1e-6)    # uneven shards => wrong gradient

# Adversarial 2: skipping the all-reduce diverges the replicas.
no_sync = [w - lr * g_ranks[r] for r in range(R)]
assert not np.array_equal(no_sync[0], no_sync[1])   # each rank drifts on its own shard

print("block1 OK: g_ddp==g_full, replicas in lockstep, unequal-shard + no-sync caught")

How to integrate with it

DDP integrates at two levels: the low-level training-loop knobs it exposes, and the higher-level trainers that wrap it.

Training-loop knobs

  • Gradient bucketing. DDP groups gradients into buckets (default ~25 MB) and launches an all-reduce per bucket as soon as it fills during backward, overlapping communication with the remaining backward compute. Tune bucket_cap_mb to balance overlap (smaller buckets start sooner) against collective efficiency (larger buckets amortize launch cost).

Validated (numpy, runnable). Bucketing must be transparent: reducing per bucket and stitching the results back in parameter order has to equal one flat all-reduce over all gradients, for any bucket size. The block proves that across several caps and shows that a single unreduced bucket (a genuinely dropped collective, a correctness bug distinct from a GDR-to-TCP fallback, which still all-reduces correctly, only slower) is detectable.

import numpy as np

rng = np.random.default_rng(2)
R = 4                                   # ranks
# Per-parameter gradients, one list per rank. Shapes mimic layers of varying size.
shapes = [(3,), (2, 4), (5,), (1,), (3, 3)]
grads = [[rng.standard_normal(s) for s in shapes] for _ in range(R)]

def flat_allreduce(grads):
    # ground truth: average the fully-flattened gradient across ranks.
    flats = [np.concatenate([g.ravel() for g in rank]) for rank in grads]
    return np.mean(flats, axis=0)

def bucketed_allreduce(grads, cap):
    # group consecutive params into buckets under `cap` elements, reduce per bucket.
    nparam = len(grads[0])
    buckets, cur, cur_sz = [], [], 0
    for p in range(nparam):
        sz = grads[0][p].size
        if cur and cur_sz + sz > cap:
            buckets.append(cur)
            cur, cur_sz = [], 0
        cur.append(p)
        cur_sz += sz
    if cur:
        buckets.append(cur)
    out = [None] * nparam
    for b in buckets:                   # reduce each bucket, then stitch back in order.
        stacked = [np.concatenate([grads[r][p].ravel() for p in b]) for r in range(R)]
        red = np.mean(stacked, axis=0)
        off = 0
        for p in b:
            n = grads[0][p].size
            out[p] = red[off:off + n]
            off += n
    return np.concatenate(out)

ref = flat_allreduce(grads)

# Any bucket cap must reconstruct the identical reduction (bucketing is transparent).
for cap in (1, 4, 7, 1000):
    got = bucketed_allreduce(grads, cap)
    assert np.allclose(got, ref, atol=1e-12), cap

# Adversarial: a bucket that never gets reduced (comm dropped) => wrong average.
broken = bucketed_allreduce(grads, cap=4).copy()
n0 = grads[0][0].size
broken[:n0] = grads[0][0].ravel()               # rank-0 only, unreduced
assert not np.allclose(broken, ref, atol=1e-6)  # a missing reduction is detectable

print("block3 OK: bucketed all-reduce == flat all-reduce; dropped bucket caught")
  • Gradient accumulation with no_sync. To accumulate over K microbatches, skip the all-reduce on the first K-1 (compute-only), then sync on the K-th; this saves K-1 collectives per effective step.
# Reference template: torch + NCCL + CUDA GPUs (not executed here).
accum = 4
for i, batch in enumerate(loader):
    sync = (i % accum == accum - 1)
    ctx = model.no_sync() if not sync else contextlib.nullcontext()
    with ctx:
        loss = model(**batch).loss / accum
        loss.backward()                          # local accumulate; no all-reduce
    if sync:
        opt.step(); opt.zero_grad()              # all-reduce happened on this step

Validated (numpy, runnable). Accumulating K microbatch gradients, each scaled by 1/K, then applying one step must equal a single backward over the concatenated K-microbatch batch. The block proves that equivalence and catches the classic bug: dropping the 1/accum scaling inflates the gradient exactly K-fold.

import numpy as np

rng = np.random.default_rng(1)
D, K, m = 5, 4, 3            # D features, K microbatches, m samples each
X = rng.standard_normal((K * m, D))
y = rng.standard_normal(K * m)
w = rng.standard_normal(D)

def mse_grad(Xb, yb, w):
    err = Xb @ w - yb
    return (2.0 / len(yb)) * (Xb.T @ err)

# Reference: one backward over the full (K*m) effective batch.
g_ref = mse_grad(X, y, w)

# no_sync accumulation: K equal microbatches, each scaled by 1/K, summed locally;
# a single all-reduce fires on the last microbatch (one rank here => identity).
micro = np.array_split(np.arange(K * m), K)
assert all(len(s) == m for s in micro)
g_accum = sum(mse_grad(X[s], y[s], w) / K for s in micro)

assert np.allclose(g_accum, g_ref, atol=1e-12), (g_accum, g_ref)

# Adversarial: forgetting the /K scaling inflates the gradient K-fold.
g_noscale = sum(mse_grad(X[s], y[s], w) for s in micro)
assert not np.allclose(g_noscale, g_ref, atol=1e-6)
assert np.allclose(g_noscale, K * g_ref, atol=1e-12)   # exact corruption signature

print("block2 OK: no_sync accumulation == full-batch grad; missing /K caught (Kx)")
  • find_unused_parameters=True lets DDP handle graphs where some parameters get no gradient (e.g. conditional branches), at a traversal cost; leave it False unless you hit the "parameter did not receive grad" error, since it is a real throughput hit.
  • static_graph=True is an optimization when the graph is identical every iteration (enables better bucketing/checkpointing); only set it if that invariant truly holds.

Higher-level trainers

Every mainstream trainer builds on this same DistributedDataParallel + torchrun core, so integration is mostly a matter of letting the framework own the wrapper:

  • Hugging Face Trainer / accelerate. Launch with torchrun or accelerate launch; the trainer wraps the model in DDP automatically and surfaces the same knobs as ddp_find_unused_parameters and ddp_bucket_cap_mb.
  • PyTorch Lightning. Trainer(strategy="ddp", devices=8) performs the process-group init, the DDP wrapping, and the DistributedSampler injection for you.
  • RL and SFT/LoRA stacks (RL libraries, SFT and LoRA) run DDP- or FSDP-backed trainers under the hood: you select the backend, they own the collective.

How to run it in production

Production DDP is a launch-orchestration and fabric problem: the training code is unchanged, and what matters is how ranks are started, how the all-reduce reaches the wire, and how the job survives a failed rank.

Launch orchestration (Slurm)

#!/bin/bash
#SBATCH --nodes=4
#SBATCH --ntasks-per-node=1
#SBATCH --gpus-per-node=8
#SBATCH --exclusive
export MASTER_ADDR=$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n1)
srun torchrun --nnodes=$SLURM_NNODES --nproc_per_node=8 \
  --rdzv_backend=c10d --rdzv_endpoint="$MASTER_ADDR:29500" \
  --rdzv_id=$SLURM_JOB_ID train.py

The Slurm/torchrun launch patterns live in Slurm; end-to-end recipes in distributed-training recipes.

Hardware and fabric

The single per-step all-reduce is the only collective, so the fabric is the whole game:

  • NVLink / NVSwitch carries the intra-node portion of the all-reduce: NCCL builds a ring/tree that uses NVLink within a node before crossing the NIC (performance tuning).
  • InfiniBand / RoCE + GPUDirect RDMA carries the inter-node all-reduce. Set NCCL_IB_HCA and NCCL_NET_GDR_LEVEL=SYS; verify GDR via NCCL_DEBUG=INFO showing [GDRDMA] (networking fabric).
  • NCCL_NVLS_ENABLE=1 enables NVLink SHARP in-switch reduction for the all-reduce on NVSwitch systems.
  • Bucket size (bucket_cap_mb) trades comm/compute overlap against per-collective efficiency, the main DDP-specific knob.
  • PCIe ACS off for P2P/GDR, or NCCL silently falls back to TCP and all-reduce throughput collapses.

Checkpointing and fault tolerance

  • Checkpoint from one rank. Because every replica is identical, save model and optimizer state from rank 0 only (guard with if dist.get_rank() == 0) and wrap save/load in dist.barrier() so no rank races ahead. On resume, load with map_location onto each rank's local device and restore the optimizer state, the DistributedSampler epoch, and the RNG so the run continues deterministically.
  • Elastic restarts. torchrun is TorchElastic: with --rdzv_backend=c10d and --max-restarts=N it re-forms the process group and resumes from the last checkpoint after a rank dies, instead of forfeiting the whole job.
  • Fail loud, not hung. Set TORCH_NCCL_ASYNC_ERROR_HANDLING=1 and pass a timeout to init_process_group so a dead or straggling rank aborts the collective with an error instead of hanging the world indefinitely (see the NCCL-hang runbook).

How to maintain it

A DDP job is only as healthy as its weakest rank; maintenance is about keeping all ranks identical, observable, and in lockstep over time.

  • Pin versions across every node. All ranks must run the same PyTorch, CUDA, and NCCL build; a mismatched NCCL or torch across hosts causes silent hangs or divergent numerics. Upgrade the whole job as one coordinated redeploy, never a rolling per-node bump.
  • Keep replicas identical. DDP broadcasts the initial module state from rank 0 at construction so every replica starts bit-identical; seed all ranks the same, keep broadcast_buffers=True (the default) so buffers like BatchNorm statistics stay in sync, and call sampler.set_epoch(epoch) each epoch so shards reshuffle without repeating.
  • Watch for stragglers. The all-reduce is a barrier, so the slowest rank sets the step time. Track per-step all-reduce/wait time and per-GPU utilization: a single slow NIC, a thermal-throttling GPU, or a noisy neighbor stalls the whole world. Log metrics from rank 0 to avoid duplicating telemetry from every rank.
  • Verify the fabric after every change. Re-check NCCL_DEBUG=INFO for [GDRDMA] and PCIe ACS state after any driver, firmware, or topology change, since a regression here silently drops the all-reduce onto TCP and throughput collapses.

How to scale it

DDP scales to multiple nodes by widening the world with torchrun; the code is unchanged, since only the launcher and the fabric matter once the per-step gradient all-reduce starts crossing the inter-node network.

# run on every node; rendezvous coordinates ranks across hosts.
torchrun \
  --nnodes=4 --nproc_per_node=8 \
  --rdzv_backend=c10d --rdzv_endpoint="$MASTER_ADDR:29500" \
  --rdzv_id=ddp-job train.py

Multi-node DDP is all-reduce-bound: the gradient all-reduce traverses InfiniBand/RoCE every step, so fabric bandwidth and NCCL config (see production above) gate scaling efficiency. When a full replica stops fitting, move to FSDP/HSDP (FSDP).

Inference

Not applicable. DDP is a training-only construct: it exists to synchronize gradients during backward, which serving does not do. For inference, replicate the model behind a load balancer (data-parallel serving) or shard it with tensor/pipeline parallel inside an inference engine (vLLM/SGLang). See inference serving and disaggregated inference.

Fine-tuning

DDP is the right backend for fits-in-memory full fine-tuning: when a full copy of the model plus its optimizer state fits on one GPU, DDP gives the simplest, fastest path (fine-tuning and post-training). It also pairs with LoRA when the (frozen base + adapters) replica fits per GPU, though large bases typically need FSDP sharding instead (FSDP/SFT and LoRA). RL post-training uses DDP- or FSDP-backed trainers behind the rollout engine (RL libraries).

Cookbook (common use cases)

1. Single-node DDP (8 GPUs)

# torchrun spawns one process per GPU; LOCAL_RANK selects the device.
torchrun --standalone --nproc_per_node=8 train.py
# Reference template: torch + NCCL + CUDA GPUs (not executed here).
dist.init_process_group("nccl")
torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
model = DDP(Model().cuda(), device_ids=[int(os.environ["LOCAL_RANK"])])

2. Multi-node DDP (4 nodes x 8 GPUs)

# identical command on every node; c10d rendezvous joins them.
torchrun --nnodes=4 --nproc_per_node=8 \
  --rdzv_backend=c10d --rdzv_endpoint="$MASTER_ADDR:29500" \
  --rdzv_id=run42 train.py

3. Gradient accumulation with no_sync (large effective batch)

# Reference template: torch + NCCL + CUDA GPUs (not executed here).
import contextlib
accum = 8
for i, batch in enumerate(loader):
    sync = (i + 1) % accum == 0
    with (contextlib.nullcontext() if sync else model.no_sync()):
        (model(**batch).loss / accum).backward()   # all-reduce only when sync
    if sync:
        opt.step(); opt.zero_grad()

Failure modes

Several of these are reproduced in miniature by the validated numpy blocks above: unequal shards and a skipped all-reduce (in "How to use it"), the missing 1/accum scale (in "How to integrate with it"), and a dropped reduction bucket.

  • Forgetting sampler.set_epoch(epoch): every epoch sees the same shuffle, hurting convergence.
  • find_unused_parameters=True left on by default: measurable throughput loss; only enable it when a parameter genuinely receives no gradient.
  • All-reduce on every microbatch during grad-accum: wasted comm; wrap the non-sync steps in model.no_sync().
  • Mismatched replicas (different model init / dropout RNG / buffers across ranks): silent divergence; seed consistently and let DDP broadcast initial state.
  • Replica no longer fits one GPU: OOM that DDP cannot fix; this is the signal to move to FSDP (FSDP).
  • ACS on / wrong NCCL_IB_HCA: NCCL off GDR, all-reduce on TCP, training crawls; verify [GDRDMA] (the NCCL-hang runbook).
  • Uneven shard sizes across ranks (dataset not divisible by world size): a rank runs out of data and the collective hangs; use drop_last=True or pad.

References

  • PyTorch DDP notes: https://docs.pytorch.org/docs/stable/notes/ddp.html
  • DistributedDataParallel API: https://docs.pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html
  • DDP getting-started tutorial: https://docs.pytorch.org/tutorials/intermediate/ddp_tutorial.html
  • "PyTorch Distributed: Experiences on Accelerating Data Parallel Training" (arXiv 2006.15704): https://arxiv.org/abs/2006.15704
  • NCCL env vars: https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/env.html

Related: Distributed training · FSDP · DeepSpeed/ZeRO · Tensor parallel · Pipeline parallel · Training recipes · Slurm · Performance · Glossary