Pipeline parallelism (PP)¶
Scope: splitting a model's layer stack into stages across devices/nodes and streaming micro-batches through them. The third axis of distributed training's 3D parallelism, complementing tensor parallelism and data parallelism (FSDP / DDP).
The
torchand Megatron snippets below are reference templates on real APIs: pin versions and validate before production use. Each is paired with a runnable, numpy-only block that checks the core math it teaches (bubble fraction, schedule memory, 3D layout, stage balance) against edge and adversarial cases.
What it is¶
Pipeline parallelism partitions a model by depth: contiguous blocks of layers become stages, each placed on a different device (or node). A forward activation flows stage 0 -> stage 1 -> ... -> stage N-1; the backward pass flows in reverse. To keep every stage busy, the global batch is split into micro-batches that are pumped through the pipeline so that while stage 1 processes micro-batch 0, stage 0 already works on micro-batch 1.
The cost is the pipeline bubble: idle time at fill (start) and drain (end) when not all stages have work. Bubble fraction shrinks as the number of micro-batches m grows relative to the number of stages p (roughly (p-1)/m for a fill-drain schedule). PP is inter-stage point-to-point (send/recv of a single activation tensor), so it is the most bandwidth-frugal parallelism axis, unlike TP's per-layer all-reduce.
Why use it¶
- Fit a model that exceeds one node. Sharding by layer caps per-device parameter + optimizer memory; an N-stage split holds ~1/N of the weights per device.
- Cheap interconnect tolerance. Stage boundaries exchange one activation per micro-batch, so PP runs acceptably over InfiniBand/RoCE between nodes where TP would stall (networking fabric, performance tuning).
- Composes orthogonally. PP across nodes + TP inside each node + DP/FSDP across replicas is the standard 3D layout for frontier-scale pretraining (distributed training).
When to use it (and when not)¶
- Use PP when the model (params + activations + optimizer state) does not fit even with FSDP sharding on a single node, i.e. the model must span nodes. Pair it with TP intra-node.
- Prefer TP alone (tensor parallelism) when the model fits in one NVLink island, since TP has no bubble and lower latency on fast links.
- Prefer FSDP/HSDP alone (FSDP) when memory pressure is moderate; FSDP needs no model-surgery into stages.
- Avoid PP when you cannot supply many micro-batches (small global batch); the bubble dominates and MFU collapses (the MFU-regression runbook).
Architecture¶
flowchart LR
subgraph N0["Node 0 (NVLink island)"]
S0["Stage S0: layers 0-7"]
end
subgraph N1["Node 1 (NVLink island)"]
S1["Stage S1: layers 8-15"]
end
subgraph N2["Node 2 (NVLink island)"]
S2["Stage S2: layers 16-23"]
end
MB["Micro-batches m0..m7"] -->|"feed"| S0
S0 -->|"activation (IB)"| S1
S1 -->|"activation (IB)"| S2
S2 -.->|"grad (reverse)"| S1
S1 -.->|"grad (reverse)"| S0
S2 --> OUT["Loss / bubble at fill+drain"]
How to use it¶
torch.distributed.pipelining (in-tree since PyTorch 2.x, still marked alpha; expect API churn, pin your version) is the native API: split the model into PipelineStage objects, wrap them in a schedule, and call schedule.step() which auto-splits the batch into micro-batches.
# REFERENCE TEMPLATE (needs torch + CUDA + torchrun; not executed here).
# pp_min.py: torchrun --nproc_per_node=<p> pp_min.py (one stage per rank)
import torch, torch.distributed as dist
from torch.distributed.pipelining import PipelineStage, ScheduleGPipe
dist.init_process_group("nccl")
rank, world = dist.get_rank(), dist.get_world_size()
dev = torch.device(f"cuda:{rank}")
stage_mod = build_stage_for_rank(rank, world).to(dev) # your layer slice
stage = PipelineStage(stage_mod, stage_index=rank, num_stages=world, device=dev)
sched = ScheduleGPipe(stage, n_microbatches=32, loss_fn=torch.nn.CrossEntropyLoss())
if rank == 0:
sched.step(x) # only first stage gets the input batch
elif rank == world - 1:
loss = sched.step(target=y) # only last stage gets the targets / loss
else:
sched.step()
dist.destroy_process_group()
The core quantity that template buys or loses is the bubble. This numpy-only block simulates a GPipe fill-drain pipeline as a discrete-event schedule and proves the makespan is exactly m + p - 1, so the relative bubble equals (p-1)/m. It checks the single-stage and single-micro-batch edges and the m >> p rule of thumb.
import numpy as np
def gpipe_makespan(p: int, m: int) -> int:
# Exact discrete-event sim. Stage s finishes microbatch i at
# max(prev stage done this mb, this stage done prev mb) + 1 time unit.
assert p >= 1 and m >= 1
finish = np.zeros((p, m), dtype=np.int64)
for s in range(p):
for i in range(m):
prev_stage = finish[s - 1, i] if s > 0 else 0
prev_mb = finish[s, i - 1] if i > 0 else 0
finish[s, i] = max(prev_stage, prev_mb) + 1
return int(finish[p - 1, m - 1])
def bubble_fraction(p: int, m: int, v: int = 1) -> float:
# Relative-to-ideal bubble for a fill-drain / 1F1B schedule with v chunks.
assert p >= 1 and m >= 1 and v >= 1
return (p - 1) / (m * v)
# 1) The exact simulation matches makespan == m + p - 1 over a grid, and the
# simulated relative bubble equals the closed form (p-1)/m everywhere.
for p in range(1, 9):
for m in range(1, 33):
mk = gpipe_makespan(p, m)
assert mk == m + p - 1, (p, m, mk)
sim_rel_bubble = (mk - m) / m # ideal busy time per stage = m
assert abs(sim_rel_bubble - bubble_fraction(p, m)) < 1e-12, (p, m)
# 2) Edge: a single stage has no bubble at all.
assert bubble_fraction(1, 32) == 0.0
assert gpipe_makespan(1, 32) == 32
# 3) Adversarial worst case: one microbatch leaves the pipeline (p-1)-idle.
assert bubble_fraction(8, 1) == 7.0
assert gpipe_makespan(8, 1) == 8
# 4) Monotone: more microbatches strictly shrink the bubble.
fracs = [bubble_fraction(8, m) for m in (1, 2, 4, 8, 16, 32, 64)]
assert all(a > b for a, b in zip(fracs, fracs[1:])), fracs
# 5) Rule of thumb m >> p keeps the bubble under 5% (here 3/64).
assert bubble_fraction(4, 64) < 0.05
print("v1 bubble OK: makespan==m+p-1 for p<=8,m<=32; sim==(p-1)/m; edges pass")
How to integrate with it¶
Two design choices drive a pipeline: how you cut the model into stages, and which schedule pumps the micro-batches. Swap ScheduleGPipe for Schedule1F1B (steady-state memory bound) or ScheduleInterleaved1F1B (multiple model chunks per rank, which shrinks the bubble).
# REFERENCE TEMPLATE (needs torch; not executed here).
from torch.distributed.pipelining import Schedule1F1B, ScheduleInterleaved1F1B
# 1F1B: one forward then one backward in steady state -> bounded activation memory
sched = Schedule1F1B(stage, n_microbatches=32, loss_fn=loss_fn)
# Interleaved 1F1B: each rank owns several non-contiguous chunks (virtual stages)
stages = [PipelineStage(chunk, i, num_stages=total_chunks, device=dev)
for i, chunk in enumerate(my_chunks)] # >1 chunk per rank
sched = ScheduleInterleaved1F1B(stages, n_microbatches=64, loss_fn=loss_fn)
The two schedules buy different things. 1F1B does not reduce the bubble versus GPipe; it bounds the activation memory that GPipe lets grow with the micro-batch count. Interleaving is the axis that actually shrinks the bubble. The first numpy block below models peak live activation stashes (GPipe stores all m, 1F1B stores at most p) and checks the m == p boundary where the two tie. The second block confirms interleaving by v virtual chunks divides the bubble by exactly v, tied back to the exact GPipe simulation at v = 1.
import numpy as np
def peak_stashes(schedule: str, p: int, m: int) -> int:
# First-order model of live activation stashes at the busiest (first) stage.
# GPipe forwards all m microbatches before any backward -> stashes = m.
# 1F1B warms up p microbatches then alternates fwd/bwd -> stashes = min(p, m).
assert schedule in ("gpipe", "1f1b") and p >= 1 and m >= 1
return m if schedule == "gpipe" else min(p, m)
p = 4
ms = np.array([4, 8, 16, 32, 64])
gpipe = np.array([peak_stashes("gpipe", p, m) for m in ms])
f1b = np.array([peak_stashes("1f1b", p, m) for m in ms])
# 1) GPipe peak grows linearly with m; 1F1B is flat at p (steady-state bound).
assert np.array_equal(gpipe, ms)
assert np.all(f1b == p)
# 2) Adversarial boundary: at m == p the two tie; 1F1B never exceeds GPipe.
assert peak_stashes("1f1b", p, p) == peak_stashes("gpipe", p, p)
assert np.all(f1b <= gpipe)
# 3) At many microbatches the saving is exactly m/p (here 64/4 = 16x).
assert peak_stashes("gpipe", p, 64) / peak_stashes("1f1b", p, 64) == 16.0
print(f"v2 memory OK: 1F1B bounded at p={p} stashes, GPipe grows as m; tie at m==p")
import numpy as np
def gpipe_makespan(p: int, m: int) -> int:
assert p >= 1 and m >= 1
finish = np.zeros((p, m), dtype=np.int64)
for s in range(p):
for i in range(m):
a = finish[s - 1, i] if s > 0 else 0
b = finish[s, i - 1] if i > 0 else 0
finish[s, i] = max(a, b) + 1
return int(finish[p - 1, m - 1])
def bubble_fraction(p: int, m: int, v: int = 1) -> float:
assert p >= 1 and m >= 1 and v >= 1
return (p - 1) / (m * v)
p, m = 4, 32
# 1) v=1 closed form matches the exact discrete-event GPipe simulation.
sim = (gpipe_makespan(p, m) - m) / m
assert abs(sim - bubble_fraction(p, m, 1)) < 1e-12
# 2) Interleaving by v virtual chunks divides the bubble by exactly v.
for v in (1, 2, 4):
assert abs(bubble_fraction(p, m, v) - bubble_fraction(p, m, 1) / v) < 1e-12
assert bubble_fraction(p, m, 2) < bubble_fraction(p, m, 1) # strictly smaller for p>1
# 3) Edge: a single stage has zero bubble regardless of interleaving.
assert bubble_fraction(1, m, 4) == 0.0
# 4) Adversarial: interleaving can never make the bubble negative or larger.
for v in (1, 2, 4, 8):
b = bubble_fraction(p, m, v)
assert 0.0 <= b <= bubble_fraction(p, m, 1)
print(f"v3 interleaved OK: bubble {bubble_fraction(p,m,1):.4f} (v1) -> {bubble_fraction(p,m,2):.4f} (v2)")
- More micro-batches -> smaller bubble but more activation memory in flight; tune
n_microbatchesagainst per-stage memory. - Balance stage compute (FLOPs) and parameter count so no stage straggles; the slowest stage sets the pace.
PP is one axis of a 3D layout. Run PP across nodes, TP inside each node, and DP or FSDP across replicas (distributed training). The data-parallel degree is whatever is left after tensor and pipeline claim their share, and the layout must divide the world exactly:
def dp_degree(world: int, tp: int, pp: int) -> int:
# Data-parallel degree is whatever GPUs remain after tensor x pipeline.
assert world > 0 and tp >= 1 and pp >= 1
assert world % (tp * pp) == 0, f"world {world} not divisible by TP*PP={tp * pp}"
return world // (tp * pp)
# Example A (production 3D): 64 nodes x 8 GPU = 512, TP=8, PP=4 -> DP=16.
world_a = 64 * 8
assert world_a == 512
assert dp_degree(world_a, 8, 4) == 16
assert 8 * 4 * dp_degree(world_a, 8, 4) == world_a # partition is exact
# Example B (cookbook): 4 nodes x 8 GPU = 32, TP=8, PP=2 -> DP=2.
world_b = 4 * 8
assert world_b == 32
assert dp_degree(world_b, 8, 2) == 2
assert 8 * 2 * 2 == world_b
# Adversarial: a mis-sized world (not divisible by TP*PP) must be rejected, not rounded.
raised = False
try:
dp_degree(500, 8, 4) # 500 / 32 is not integer
except AssertionError:
raised = True
assert raised, "expected non-divisible layout to be rejected"
print("v4 layout OK: 512=(8x4x16), 32=(8x2x2); non-divisible layout rejected")
How to run it in production¶
Megatron-LM is the production reference for combining all three (its 1F1B and interleaved schedules are the canonical implementations). A multi-node launch sets the tensor, pipeline, and virtual-pipeline sizes explicitly:
# Megatron-LM (multi-node): 3D = TP(8, intra-node) x PP(4, across nodes) x DP(rest)
torchrun --nnodes=64 --nproc_per_node=8 \
pretrain_gpt.py \
--tensor-model-parallel-size 8 \
--pipeline-model-parallel-size 4 \
--num-layers-per-virtual-pipeline-stage 2 \
--sequence-parallel \
--micro-batch-size 1 --global-batch-size 1024 \
--use-distributed-optimizer
# 64 nodes x 8 = 512 GPUs. DP degree = world_size / (TP * PP) = 512 / (8*4) = 16
Multi-instance / multi-node is the point of PP: it exists to let a model span more devices than one node holds. Place stage boundaries on the slower (inter-node) links and keep all-reduce-heavy TP on the fast (intra-node) fabric.
Serving and inference¶
PP is used to serve models too large for one node, with successive stages on successive nodes and requests streamed as micro-batches to amortise the bubble. For LLM serving the disaggregated/expert-parallel patterns in disaggregated inference and the serving engines in inference serving/serving open-weight models are usually preferred (vLLM/SGLang favour TP + EP + PP combinations); pipeline-parallel serving shines for single huge dense models where one node lacks the memory. See those pages for production serving.
Fine-tuning¶
Large-model fine-tuning reuses the same stage split: full-parameter fine-tuning of a model that spans nodes runs PP across nodes + TP within, identical to pretraining. Parameter-efficient methods (SFT and LoRA) usually remove the need for PP by shrinking the trainable/optimizer footprint to fit under FSDP. Post-training methods, RL layouts, and recipes are covered in fine-tuning and post-training and distributed-training recipes.
Cookbook (common use cases)¶
1. Two-stage PP on one node (smoke test, torch.distributed.pipelining)
torchrun --standalone --nproc_per_node=2 pp_min.py # ScheduleGPipe, 2 stages
# expect: rank 0 holds first half, rank 1 holds second half + loss
2. Switch to 1F1B for activation-memory-bound runs
# REFERENCE TEMPLATE (needs torch; not executed here).
# drop-in for the schedule line in pp_min.py
from torch.distributed.pipelining import Schedule1F1B
sched = Schedule1F1B(stage, n_microbatches=32, loss_fn=torch.nn.CrossEntropyLoss())
The swap is a pure memory win: 1F1B keeps the same bubble (p-1)/m as GPipe and only cuts the peak activation stashes. This numpy block encodes that invariant (a guard against the common belief that 1F1B shrinks the bubble) and sweeps it adversarially so the swap is never a throughput or memory regression:
def bubble_for(sched: str, p: int, m: int) -> float:
# Fill-drain family. GPipe and plain 1F1B both run at v=1; only interleaving
# (c virtual chunks per rank) raises the effective v and shrinks the bubble.
v = {"gpipe": 1, "1f1b": 1, "interleaved-1f1b(c=2)": 2}[sched]
assert p >= 1 and m >= 1
return (p - 1) / (m * v)
def peak_stashes(sched: str, p: int, m: int) -> int:
# GPipe stashes all m; both 1F1B variants stash at most p.
return m if sched == "gpipe" else min(p, m)
p, m = 4, 32
# The drop-in swap GPipe -> 1F1B is a pure memory win: identical bubble (this is
# the common misconception guard), strictly fewer live activation stashes.
assert bubble_for("1f1b", p, m) == bubble_for("gpipe", p, m) # 1F1B does NOT cut bubble
assert peak_stashes("1f1b", p, m) < peak_stashes("gpipe", p, m) # 32 -> 4 stashes
# Only interleaving cuts the bubble.
assert bubble_for("interleaved-1f1b(c=2)", p, m) < bubble_for("1f1b", p, m)
# Adversarial sweep: the swap is never a throughput regression nor a memory regression.
for mm in (1, 4, 8, 32, 128):
assert bubble_for("1f1b", p, mm) == bubble_for("gpipe", p, mm)
assert peak_stashes("1f1b", p, mm) <= peak_stashes("gpipe", p, mm)
print("v6 swap OK: 1F1B keeps bubble (p-1)/m, cuts stashes 32->4; only interleave cuts bubble")
3. 3D layout (TP intra-node + PP inter-node + DP) in Megatron-LM
# 4 nodes x 8 GPU = 32. TP=8 (NVLink), PP=2 (IB across node pairs), DP=2.
torchrun --nnodes=4 --nproc_per_node=8 pretrain_gpt.py \
--tensor-model-parallel-size 8 --pipeline-model-parallel-size 2 \
--global-batch-size 512 --micro-batch-size 1 \
--num-layers-per-virtual-pipeline-stage 1 # interleaved -> smaller bubble
How to maintain it¶
The pipeline runs at the pace of its slowest stage, so the ongoing job is to keep stages balanced and the bubble small. Rebalance when a stage straggles, and balance FLOPs and parameters, not layer count: the depth cut must give each stage roughly 1/p of the work. This numpy block builds a contiguous, parameter-balanced depth split, checks it loses no layer (corruption guard), holds each stage within 5 percentage points of 1/p, and shows param-balancing beats a naive equal-layer-count cut on a skewed stack.
import numpy as np
def partition_stages(layer_params: list[int], p: int) -> list[list[int]]:
# Contiguous depth split into p non-empty stages, balanced by parameter count.
assert p >= 1 and len(layer_params) >= p
n = len(layer_params)
prefix = np.cumsum([0] + list(layer_params)).astype(float)
total = prefix[-1]
stages: list[list[int]] = []
start = 0
for s in range(p):
remaining = p - s
if s == p - 1:
end = n # last stage takes the tail
else:
target_cum = prefix[start] + (total - prefix[start]) / remaining
end = start + 1
while end < n and prefix[end] < target_cum and (n - end) > (remaining - 1):
end += 1
end = min(end, n - (remaining - 1)) # leave >=1 layer per stage left
end = max(end, start + 1)
stages.append(list(range(start, end)))
start = end
return stages
rng = np.random.default_rng(0)
layer_params = rng.integers(80, 120, size=48).tolist() # 48 near-equal layers
p = 4
stages = partition_stages(layer_params, p)
# 1) Partition is exact: every layer used once, contiguous, correct stage count.
flat = [i for st in stages for i in st]
assert flat == list(range(len(layer_params))) # contiguous + complete
assert len(stages) == p and all(len(st) >= 1 for st in stages)
# 2) Each stage holds ~1/p of the parameters (within 5%).
stage_w = np.array([sum(layer_params[i] for i in st) for st in stages])
assert stage_w.sum() == sum(layer_params) # no params lost
share = stage_w / stage_w.sum()
assert np.all(np.abs(share - 1.0 / p) < 0.05)
# 3) Adversarial: balancing by params beats naive equal-layer-count on a skewed stack.
skew = [1] * 40 + [1000] * 8 # last 8 layers dominate
bal_w = np.array([sum(skew[i] for i in st) for st in partition_stages(skew, p)])
naive_w = np.array([blk.sum() for blk in np.array_split(np.array(skew), p)])
assert bal_w.max() / bal_w.mean() < naive_w.max() / naive_w.mean()
print(f"v5 partition OK: shares={np.round(share,3).tolist()} ~1/{p}; balanced beats naive on skew")
- GPUDirect RDMA on the inter-stage send/recv keeps the activation copy off the host; confirm
[GDRDMA]inNCCL_DEBUG=INFOand keep PCIe ACS off. - Bubble is the dominant overhead, not bandwidth. Raise
n_microbatchesand use interleaved 1F1B before reaching for a faster fabric. - Watch MFU. Too few micro-batches tanks it; the diagnosis path is the MFU-regression runbook.
How to scale it¶
Across nodes, PP is the outer axis carried over IB/RoCE; TP stays inside each node on NVLink. This is the whole reason PP exists: to let a model span more devices than one node holds.
- PP tolerates slower inter-node links. A stage boundary exchanges only one activation tensor per micro-batch, so PP is the right axis to carry between nodes over InfiniBand/RoCE, while TP (all-reducing every layer) stays on NVLink/NVSwitch inside the node.
- Cut the bubble before the fabric. At high stage counts, interleaved 1F1B (virtual chunks per rank) divides the bubble by the chunk count, as the interleaving check above proves; reach for it before a faster interconnect.
- On Blackwell, FP8/NVFP4 activations halve the inter-stage payload; pair with sequence parallelism to cut activation memory per stage (the Blackwell platform).
Failure modes¶
- Too few micro-batches -> bubble dominates, MFU tanks. Keep
m >> p(the MFU-regression runbook). - Unbalanced stages -> the heaviest stage gates the whole pipeline; balance FLOPs and params, not just layer count.
- Last-stage-only loss / first-stage-only input wired wrong ->
schedule.step()raises or silently no-ops. The first rank passes inputs, the last passes targets. - PP put on NVLink while TP spans nodes -> inverted layout, TP all-reduce stalls on IB. Always TP intra-node, PP inter-node (tensor parallelism).
- Activation memory blow-up with GPipe at high
m-> switch to 1F1B (bounds in-flight activations). - Uneven
n_microbatchesvs interleaving chunks -> scheduler asserts; chunk count and micro-batch count must be consistent.
References¶
- PyTorch
torch.distributed.pipelining: https://docs.pytorch.org/docs/stable/distributed.pipelining.html - PyTorch pipelining tutorial: https://docs.pytorch.org/tutorials/intermediate/pipelining_tutorial.html
- Megatron-LM (NVIDIA): https://github.com/NVIDIA/Megatron-LM · Megatron-Core pipeline_parallel: https://docs.nvidia.com/megatron-core/developer-guide/latest/api-guide/index.html
- GPipe (Huang et al., 2018): https://arxiv.org/abs/1811.06965
- PipeDream / 1F1B (Harlap et al., 2018): https://arxiv.org/abs/1806.03377
- Efficient Large-Scale LM Training on GPU Clusters (Megatron, interleaved schedule): https://arxiv.org/abs/2104.04473
Related: Distributed Training · Tensor Parallel · FSDP · Networking Fabric · Perf Optimization · Training Recipes · Glossary