Data loading pipeline tuning¶
Scope: keeping GPUs fed from the host input pipeline. Sizing num_workers and prefetch_factor, pin_memory + non_blocking=True H2D copies, persistent_workers, sharded streaming formats (WebDataset/tar, Parquet/Arrow), avoiding metadata storms and the small-file problem, and diagnosing input starvation on a profiler timeline. NUMA/CPU binding of those workers lives in NUMA affinity and CPU pinning for GPU pipelines; device-direct storage in GPUDirect Storage.
The prefetch-queue model, the wait-fraction speedup, the sequential-versus-random read amortization, and the H2D overlap timing below are executed and asserted with numpy (system python3). The PyTorch code blocks (
DataLoader, pinned-copy stream, WebDataset,next(iterator)timing,torch.cuda.EventH2D timer) require torch or webdataset (not installed here); they are labelled reference templates, and the core math each one relies on is validated separately in a numpy block that is run and asserted.
What it is¶
The input pipeline is the chain that turns bytes at rest into batched tensors in GPU memory: read from storage, decode/deserialize (parse JSON, decode JPEG), transform (tokenize, crop), collate into a batch, then copy host to device (H2D). In PyTorch this is Dataset + DataLoader; the loader fans the per-sample work out to num_workers subprocesses (separate processes, to dodge the Python GIL), each prefetching ahead into a queue that the main process drains.13
"Tuning" the pipeline means making each stage produce batch N+1 while the GPU computes batch N, so the device never blocks on data. The book's framing: the typical loading process is read, then decode, then transform, then collate, all CPU-intensive (and offloadable to the GPU when compute-heavy), and the goal is that "GPUs are never idle waiting for new data."1
This page is the host-software complement to two neighbours: raw device throughput is the storage layer's job (Storage and data platform, GPUDirect Storage), and locality of the worker processes is NUMA's (NUMA affinity and CPU pinning for GPU pipelines). Here the concern is parallelism, prefetch depth, copy overlap, and on-disk layout.
Why use it¶
A poorly tuned input pipeline is the cheapest way to waste the most expensive hardware. The book is blunt: "A poorly tuned input pipeline could waste 50% of your GPU time, whereas algorithmic optimizations might give only a few percent," and "In many cases, optimizing the data pipeline yields more improvement than any algorithmic tweak."1
The failure mode is input starvation: GPU utilization drops below target and the compute timeline shows gaps at the start of each iteration while the loader catches up. The book's worked example: a pipeline where GPUs spend 30% of their time waiting for data is I/O-limited; after tuning down to 5% wait, training steps/sec rise proportionally, a 6x reduction in wasted GPU time in that case.1 (The ## How to scale it section reproduces this with a runnable model and is careful to separate the 6x wasted-time reduction from the resulting throughput multiplier.)
Three forces make this worse over time:
- Scaling out. Add GPUs without widening the pipeline and the bottleneck shifts onto data loading. "Scaling out compute without scaling out the ingestion pipeline resources will shift the bottleneck even further toward the data loading pipeline." Each data-parallel rank reads a distinct shard, so aggregate loader demand scales with cluster size.1 See Distributed training recipes.
- Faster GPUs. Blackwell/Rubin-class devices demand more bytes/sec, so per-GPU data rate (for example the book's ~200 MB/s/GPU figure, ~1.6 GB/s for 8 GPUs, 14 to 20 GB/s for a 72-GPU NVL72 rack) only grows.1
- Small random reads. Storage and GPUs both prefer large sequential reads; "excessive small random reads will more quickly become a bottleneck" on faster GPUs.1
When to use it (and when not)¶
Tune the pipeline when:
- GPU utilization is below target and the profiler timeline shows compute gaps at iteration boundaries (the starvation signature, not a sync/all-reduce gap).
- CPU utilization is pinned (for example the book's "800% CPU = 8 cores at 100%" image/video decode case). The host cannot keep up.
- You are scaling to more GPUs/nodes; size the pipeline up in lockstep.
- Your dataset is millions of individual files (the small-file / metadata-storm regime, below).
Do not over-invest when:
- The workload is genuinely compute-bound with a trivial input (synthetic data, on-GPU generation); the CPU sits idle anyway. Confirm with the roofline classification before tuning.
- The bottleneck is the device path (H2D copy or storage bandwidth), not Python/worker throughput. Those are fixed by pin_memory / GDS, not by adding workers. The diagnosis section below disambiguates.
- A higher abstraction already owns it: NVIDIA DALI / NeMo Curator move decode/curation off the critical path; see "How to integrate it" below.
There is no universal worker count: "Finding the right number of workers is empirical": too few starves the GPU, too many contends for CPU cores and I/O bandwidth.1
Architecture¶
The pipeline is a chain of producers with a bounded queue between the loader and the GPU. num_workers feeder subprocesses read, decode, transform, and collate in parallel; they push completed batches into an in-flight queue of depth num_workers * prefetch_factor; the main process pops the head batch, pins it (if pin_memory=True), and issues an asynchronous H2D copy on a side stream so the transfer overlaps the previous batch's compute. The device only stalls when it drains the queue faster than the workers refill it (the Python/worker side) or when the H2D copy cannot overlap (the device side). The diagnosis is deciding which of those two, if either, is the wall.
flowchart LR
subgraph STORE["Storage (shards, not scattered files)"]
SH["WebDataset tar / Parquet / Arrow shards"]
end
subgraph WORKERS["num_workers feeder subprocesses (dodge the GIL)"]
W0["read -> decode -> transform -> collate"]
W1["read -> decode -> transform -> collate"]
end
SH -->|"sequential reads, 1 shard/worker"| W0
SH --> W1
W0 -->|"push batch"| Q["Prefetch queue<br/>depth = num_workers x prefetch_factor"]
W1 --> Q
Q -->|"pop head batch"| PIN["pin_memory: page-locked host buffer"]
PIN -->|".to(cuda, non_blocking=True) on copy stream"| GPU["GPU compute (batch N)<br/>overlaps H2D of batch N+1"]
GPU -.->|"idle gap at iteration start = starvation"| Q
How to use it: size the loader (num_workers, prefetch_factor, the queue)¶
1. Baseline the loader in isolation (before touching knobs)¶
Measure the loader with all downstream GPU work disabled. Time how long it takes to produce a fixed number of batches, then compare to your target iteration time and to measured GPU-idle time.1
Reference template (needs torch, not installed here; the queue math it depends on is validated in ## How to scale it):
# REFERENCE TEMPLATE (requires torch; not runnable in this environment).
from __future__ import annotations
import time
import torch
from torch.utils.data import DataLoader
def time_loader(loader: DataLoader, n_batches: int = 100) -> float:
"""Wall-clock seconds to pull n_batches with no GPU work. Lower = faster pipeline."""
it = iter(loader)
next(it) # warm up workers / first prefetch; exclude startup from timing
t0 = time.perf_counter()
for i, _ in enumerate(it, start=1):
if i >= n_batches:
break
return time.perf_counter() - t0
Caveat from the book: disabling GPU kernels also removes CPU-side kernel-launch overhead, so this "pure" loader throughput reads higher (faster) than what a real run sees. Still useful, just do not treat it as the absolute ceiling.1
2. Set num_workers, prefetch_factor, and the queue depth¶
Reference template (needs torch):
# REFERENCE TEMPLATE (requires torch; not runnable in this environment).
loader = DataLoader(
dataset,
batch_size=32,
num_workers=8, # parallel feeder processes; empirical, scale with GPUs
pin_memory=True, # stage batches into page-locked host memory for fast DMA
persistent_workers=True, # keep workers alive across epochs (avoid re-fork cost)
prefetch_factor=4, # batches loaded ahead PER worker (default 2 when workers>0)
)
Exact documented semantics (do not guess these):
num_workers: "how many subprocesses to use for data loading.0means that the data will be loaded in the main process. (default:0)".3 With0, there is no prefetch and the loop blocks on every batch.prefetch_factor: "Number of batches loaded in advance by each worker.2means there will be a total of2 * num_workersbatches prefetched across all workers. (default ...Noneifnum_workers=0, otherwise2)".3 The in-flight queue depth is thereforenum_workers * prefetch_factorbatches. The book: raise it "to 4 or 8" for bursty I/O or occasional worker starvation.1persistent_workers: "IfTrue, the data loader will not shut down the worker processes after a dataset has been consumed once. (default:False)".3 The book recommends it to avoid re-fork/teardown each epoch, "most effective when you loop over the same dataset many times, especially if these iterations (aka epochs) are very short."1
Sizing rule of thumb from the book: "Ideally, you want near 100% utilization of disk throughput and some headroom on CPU." High-core hosts (for example the 72-core Grace CPU on GB200/GB300) can drive more workers, but watch for diminishing returns from I/O contention.1 For multi-core feed-side binding so these workers stay local to their GPU, see NUMA affinity and CPU pinning for GPU pipelines.
Validated core math (numpy, system python3). The knobs above set the in-flight queue depth Q = num_workers * prefetch_factor, which is exactly what decides whether the GPU starves. This model reproduces the behaviour: a fast loader never waits, a slow one starves in steady state at batch_ms - step_ms per iteration, deeper prefetch buys more free steps up front but not a lower asymptote, and a structurally different producer/consumer simulator agrees over 300 random configurations.
# Prefetch-queue starvation model, validated (system python3, numpy). Run: python3 prefetch.py
import numpy as np
def per_step_wait(step_ms, batch_ms, num_workers, prefetch_factor, n_steps):
"""GPU wait (ms) on each of n_steps iterations under a bounded prefetch queue.
Model matching the DataLoader knobs the page teaches:
* `num_workers` feeders each take `batch_ms` to produce one batch, so
batches arrive in BURSTS of `num_workers` every `batch_ms`.
* At t=0 the queue is prefilled with Q = num_workers * prefetch_factor
ready batches (the documented in-flight depth).
* The GPU consumes one batch per step: it waits until the head batch is
ready, then computes for step_ms.
Batch k (0-indexed) is ready at 0 for the first Q; batch Q+j lands in burst
(j // num_workers + 1), i.e. at ceil((j+1)/num_workers) * batch_ms.
"""
assert num_workers >= 1 and prefetch_factor >= 1 and n_steps >= 1
Q = num_workers * prefetch_factor
ready = np.empty(n_steps, np.float64)
for k in range(n_steps):
if k < Q:
ready[k] = 0.0
else:
burst = (k - Q) // num_workers + 1 # 1st burst after prefetch, etc.
ready[k] = burst * batch_ms
gpu_clock = 0.0
waits = np.empty(n_steps, np.float64)
for k in range(n_steps):
wait = max(0.0, ready[k] - gpu_clock)
waits[k] = wait
gpu_clock += wait + step_ms
return waits
def per_step_wait_ref(step_ms, batch_ms, num_workers, prefetch_factor, n_steps):
"""Independent reference, different control flow: run a producer clock that
emits a burst of `num_workers` batches every batch_ms into a FIFO, and a
consumer that blocks on an empty queue. Cross-checks per_step_wait."""
Q = num_workers * prefetch_factor
ready_fifo = [0.0] * Q # the Q prefetched batches, ready at t=0
prod_clock = 0.0
gpu_t = 0.0
consumed = 0
waits = []
for _ in range(n_steps):
# ensure the batch we are about to consume has been produced.
while consumed >= len(ready_fifo):
prod_clock += batch_ms
ready_fifo.extend([prod_clock] * num_workers)
avail = ready_fifo[consumed]
wait = max(0.0, avail - gpu_t)
gpu_t += wait + step_ms
consumed += 1
waits.append(wait)
return np.asarray(waits, np.float64)
# happy path: loader faster than GPU (burst spacing << step) => never starves.
w = per_step_wait(step_ms=50.0, batch_ms=40.0, num_workers=8,
prefetch_factor=4, n_steps=200)
assert np.allclose(w, 0.0), f"fast loader must never wait, got max {w.max()}"
# edge: single slow worker => steady-state wait = batch_ms - step per iteration.
ws = per_step_wait(step_ms=10.0, batch_ms=80.0, num_workers=1,
prefetch_factor=2, n_steps=60)
assert ws[0] == 0.0 and ws[-1] > 0.0 # prefetch cushions the start
assert abs(ws[-1] - 70.0) < 1e-6, ws[-1] # 80 - 10 = 70 ms/step
# edge: deeper prefetch delays the onset of starvation but not the steady rate.
shallow = per_step_wait(10.0, 80.0, 1, 2, 60)
deep = per_step_wait(10.0, 80.0, 1, 6, 60)
assert (deep == 0.0).sum() > (shallow == 0.0).sum() # more free steps up front
assert abs(deep[-1] - shallow[-1]) < 1e-6 # same asymptote
# adversarial: closed model == independent producer/consumer sim over 300 random configs.
rng = np.random.default_rng(11)
for _ in range(300):
s, b = float(rng.uniform(5, 60)), float(rng.uniform(5, 120))
nw, pf = int(rng.integers(1, 9)), int(rng.integers(1, 6))
a = per_step_wait(s, b, nw, pf, 50)
r = per_step_wait_ref(s, b, nw, pf, 50)
assert np.allclose(a, r, atol=1e-6), (s, b, nw, pf, np.abs(a - r).max())
# the identity the page states: in-flight depth Q = num_workers * prefetch_factor
assert 8 * 4 == 32
print(f"fast loader max wait={w.max():.3f}ms; slow steady wait={ws[-1]:.1f}ms; "
f"deeper prefetch free-steps {int((deep==0).sum())}>{int((shallow==0).sum())}; "
f"Q(8x4)=32; 300-config producer/consumer agreement OK")
Running it prints fast loader max wait=0.000ms; slow steady wait=70.0ms; deeper prefetch free-steps 6>2; Q(8x4)=32; 300-config producer/consumer agreement OK and every assert passes. The takeaway the knobs encode: prefetch depth Q buys a cushion (free steps before starvation bites), but if the loader's steady-state throughput is below the GPU's consumption rate, no Q fixes it, you must widen the loader (more workers, cheaper per-sample work, or a faster on-disk layout).
How to run it in production: pin host memory and overlap the H2D copy¶
pin_memory=True stages each batch in CUDA page-locked memory; pair it with non_blocking=True on the copy so the transfer overlaps GPU compute on a separate stream. Without pinning, the copy cannot be truly asynchronous. The driver must pin pages on the fly first.
Reference template (needs torch):
# REFERENCE TEMPLATE (requires torch + CUDA; not runnable in this environment).
copy_stream = torch.cuda.Stream()
device = torch.device("cuda")
for batch in loader:
with torch.cuda.stream(copy_stream):
batch_gpu = batch.to(device, non_blocking=True)
# Ensure the async H2D copy completes before compute consumes the buffer.
torch.cuda.current_stream().wait_stream(copy_stream)
outputs = model(batch_gpu)
# loss / backward / optimizer step ...
pin_memory "will copy Tensors into device/CUDA pinned memory before returning them"; DMA from pinned, locked RAM avoids extra copies and page faults and "allows truly asynchronous .to(..., non_blocking=True) copies when the source is pinned".31 The book reports pinned to device copies 2 to 3x faster than pageable, with the flag alone worth up to 10% to 20% end-to-end on data-heavy loops.2
Large pinned buffers count against ulimit -l (max locked memory); set it high or unlimited or the allocation fails / falls back to swappable memory.1
ulimit -l unlimited # interactive shell
docker run --ulimit memlock=-1 ... # container: lift the memlock cap
If you return a custom batch type (not a tensor or standard collection), define a pin_memory() method on it or the loader's default pinning logic will silently skip it.3
Validated core math (numpy, system python3). The reason pin_memory + non_blocking matters is that it turns a serial copy-then-compute step into a two-stage software pipeline where the H2D copy of batch N+1 overlaps the compute of batch N. This model reproduces that: overlap is limited by the slower of the two stages after fill, a single batch overlaps nothing, and shrinking the copy stage (the 3x pinned speedup) yields a real end-to-end win on a data-heavy loop. An independent per-stream event simulator agrees over 800 cases, and overlap is asserted to never be slower than serial.
# H2D copy/compute overlap timing, validated (system python3, numpy). Run: python3 h2d.py
import numpy as np
def pipeline_time(copy_ms, compute_ms, n_batches, overlap):
"""Wall-clock ms for n_batches through a copy(H2D)->compute pipeline.
overlap=False (pageable / non_blocking off): copy and compute serialize,
total = n * (copy + compute).
overlap=True (pinned + non_blocking): copy of batch k+1 runs on a separate
stream concurrently with compute of batch k, so after the first copy fills
the pipeline the limit is the SLOWER stage each step, plus the final drain:
total = copy + (n-1)*max(copy, compute) + compute.
"""
assert copy_ms > 0 and compute_ms > 0 and n_batches >= 1
if not overlap:
return n_batches * (copy_ms + compute_ms)
return copy_ms + (n_batches - 1) * max(copy_ms, compute_ms) + compute_ms
def pipeline_time_sim(copy_ms, compute_ms, n_batches, overlap):
"""Independent event simulator with explicit per-stream clocks. Different
control flow; cross-checks pipeline_time."""
copy_clock = 0.0 # when the copy stream is next free
comp_clock = 0.0 # when the compute stream is next free
finish = 0.0
for k in range(n_batches):
if not overlap:
copy_done = copy_clock + copy_ms
comp_done = copy_done + compute_ms
copy_clock = comp_done # everything serializes on one timeline
comp_clock = comp_done
finish = comp_done
else:
copy_done = copy_clock + copy_ms
copy_clock = copy_done
start_comp = max(copy_done, comp_clock) # compute waits for input + its stream
comp_done = start_comp + compute_ms
comp_clock = comp_done
finish = comp_done
return finish
# happy path: overlap hides the smaller stage; here compute-bound (copy < compute).
serial = pipeline_time(copy_ms=4.0, compute_ms=10.0, n_batches=100, overlap=False)
ovl = pipeline_time(copy_ms=4.0, compute_ms=10.0, n_batches=100, overlap=True)
assert serial == 100 * 14.0 # 1400 ms, no overlap
assert abs(ovl - (4.0 + 99 * 10.0 + 10.0)) < 1e-9, ovl # 4 + 99*10 + 10 = 1004
assert ovl < serial and serial / ovl > 1.35 # ~1.39x here
# the page's pinned-copy claim: pinned->device copy is 2-3x faster than pageable.
# On a DATA-HEAVY loop (copy is a big fraction of the step) pinning the copy stage
# is where the page's "up to 10-20% end-to-end" win comes from. Use copy >= compute.
pageable_copy, pinned_copy, comp = 9.0, 3.0, 6.0 # 3x faster pinned copy
assert abs(pageable_copy / pinned_copy - 3.0) < 1e-9 # the 3x speedup, exactly
t_pageable = pipeline_time(pageable_copy, comp, 100, overlap=True) # copy-bound at 9
t_pinned = pipeline_time(pinned_copy, comp, 100, overlap=True) # compute-bound at 6
gain = 1.0 - t_pinned / t_pageable
assert 0.10 <= gain <= 0.40, gain # ~33% here; data-heavy regime
assert pipeline_time(pinned_copy, comp, 100, False) > t_pinned # beats pageable+serial
# edge: copy-bound (copy > compute) => overlap limited by copy, compute hidden instead.
cb_serial = pipeline_time(copy_ms=12.0, compute_ms=3.0, n_batches=50, overlap=False)
cb_ovl = pipeline_time(copy_ms=12.0, compute_ms=3.0, n_batches=50, overlap=True)
assert abs(cb_ovl - (12.0 + 49 * 12.0 + 3.0)) < 1e-9 # bottleneck = copy = 12
assert cb_ovl < cb_serial
# edge/boundary: a single batch cannot overlap anything (fill == whole cost).
one_s = pipeline_time(4.0, 10.0, 1, overlap=False)
one_o = pipeline_time(4.0, 10.0, 1, overlap=True)
assert one_s == one_o == 14.0 # copy+compute, no hiding
# adversarial: closed form == independent event simulator over 400 random cases (x2 modes).
rng = np.random.default_rng(3)
for _ in range(400):
cp, cm = float(rng.uniform(0.5, 20)), float(rng.uniform(0.5, 20))
n = int(rng.integers(1, 64))
for ov in (False, True):
a = pipeline_time(cp, cm, n, ov)
b = pipeline_time_sim(cp, cm, n, ov)
assert abs(a - b) < 1e-6, (cp, cm, n, ov, a, b)
# adversarial: overlap can never be slower than serial (it is a valid schedule of it).
for _ in range(400):
cp, cm = float(rng.uniform(0.5, 20)), float(rng.uniform(0.5, 20))
n = int(rng.integers(1, 64))
assert pipeline_time(cp, cm, n, True) <= pipeline_time(cp, cm, n, False) + 1e-9
print(f"serial={serial:.0f}ms overlap={ovl:.0f}ms ({serial/ovl:.2f}x); "
f"pinned-copy end-to-end gain={gain*100:.0f}%; "
f"copy-bound {cb_serial:.0f}->{cb_ovl:.0f}ms; single-batch no-overlap={one_o:.0f}ms; "
f"800-case sim+monotonicity agreement OK")
Running it prints serial=1400ms overlap=1004ms (1.39x); pinned-copy end-to-end gain=33%; copy-bound 750->603ms; single-batch no-overlap=14ms; 800-case sim+monotonicity agreement OK and every assert passes. This is why the book's 2 to 3x pinned copy shows up as a step-level gain only when the copy is a real fraction of the step; on a compute-bound loop the copy was already hidden and pinning buys little, which is exactly the "when not" caveat above.
How to integrate it: fix the on-disk layout (shard, do not scatter)¶
The single biggest win for large datasets is the storage format. "Avoid storing millions of individual image files since this will lead to lots of random seeks all over the disk. Consider, instead, storing them in a few large binary (for example Arrow, TFRecord, or Parquet) files ... WebDataset tar files, or equivalents," each holding many concatenated samples.1
Why this beats a directory of small files:
- Sequential vs random. Storage measures far higher throughput on large sequential reads; one chunk yields many samples in a single pass. Tune read size up (1 MB chunks beat 4 KB) to amortize per-read overhead.1
- Metadata storms / small-file problem. Millions of tiny files hammer the filesystem metadata service (per-file
open/stat), which a parallel FS or object store serves poorly at scale. Packing into shards collapses millions of metadata ops into a handful.
Validated core math (numpy, system python3). The claim is that packing samples into large shards is strictly faster because it amortizes a fixed per-read overhead (seek + metadata op) over many samples and reads at sequential rather than random bandwidth. This model reproduces it: a directory of a million files is far slower than 256 MB shards, 1 MB chunks pay 256x fewer per-read overheads than 4 KB chunks, read time is monotone non-increasing in chunk size, and with zero overhead and equal bandwidth the layout stops mattering (isolating the mechanism).
# Sequential-vs-random read + small-file amortization, validated (system python3, numpy).
# Run: python3 sharding.py
import numpy as np
def read_time_s(n_samples, sample_bytes, chunk_bytes, seq_bw, rand_bw,
per_read_overhead_s):
"""Wall-clock seconds to read n_samples, each `sample_bytes`, when packed into
chunks of `chunk_bytes`. Each chunk read pays a fixed `per_read_overhead_s`
(seek + metadata `open`/`stat`) plus transfer at `seq_bw` for the bytes read;
random single-sample reads instead transfer at the lower `rand_bw`.
Larger chunks amortize the per-read overhead over more samples: fewer chunks =>
fewer overhead payments and sequential (not random) bandwidth.
"""
assert chunk_bytes >= sample_bytes and seq_bw >= rand_bw > 0
samples_per_chunk = max(1, chunk_bytes // sample_bytes)
n_chunks = int(np.ceil(n_samples / samples_per_chunk))
total_bytes = n_samples * sample_bytes
if samples_per_chunk == 1: # scatter: one random read per sample
return n_samples * per_read_overhead_s + total_bytes / rand_bw
return n_chunks * per_read_overhead_s + total_bytes / seq_bw
# Fixed medium: NVMe-ish. overhead dominated by seek/metadata, not transfer.
SEQ_BW = 3.0e9 # 3 GB/s sequential
RAND_BW = 0.4e9 # 0.4 GB/s effective on tiny random reads
OVH = 100e-6 # 100 us per read (seek + open/stat metadata op)
SAMPLE = 50_000 # 50 KB per sample (e.g. a JPEG)
N = 1_000_000 # one million samples
# scatter: a directory of a million individual files (chunk == one sample).
scatter = read_time_s(N, SAMPLE, chunk_bytes=SAMPLE, seq_bw=SEQ_BW,
rand_bw=RAND_BW, per_read_overhead_s=OVH)
# shard: pack into ~256 MB shards (WebDataset/Parquet), sequential reads.
shard = read_time_s(N, SAMPLE, chunk_bytes=256 * 1024 * 1024, seq_bw=SEQ_BW,
rand_bw=RAND_BW, per_read_overhead_s=OVH)
# happy path: sharding is strictly faster (the page's central claim).
assert shard < scatter, (shard, scatter)
spc = (256 * 1024 * 1024) // SAMPLE
assert scatter > shard * 5, (scatter, shard) # ~180x fewer reads => big win
# the overhead the two layouts pay differs by exactly the read-count ratio.
n_shards = int(np.ceil(N / spc))
assert abs((N * OVH) / (n_shards * OVH) - N / n_shards) < 1e-6
# edge: 1 MB chunk beats 4 KB chunk (the page's "tune read size up" claim).
def overhead_per_sample(chunk_bytes, sample_bytes, ovh):
spc = max(1, chunk_bytes // sample_bytes)
return ovh / spc
tiny = overhead_per_sample(4 * 1024, 512, OVH) # 4 KB chunks, 512 B samples
big = overhead_per_sample(1024 * 1024, 512, OVH) # 1 MB chunks
assert big < tiny
assert abs(tiny / big - (1024 * 1024) / (4 * 1024)) < 1e-6 # exactly 256x fewer reads
# edge/boundary: a chunk that holds exactly one sample equals the scatter path.
one = read_time_s(N, SAMPLE, chunk_bytes=SAMPLE, seq_bw=SEQ_BW,
rand_bw=RAND_BW, per_read_overhead_s=OVH)
assert one == scatter
# adversarial: read time is monotone non-increasing in chunk size (bigger never worse).
prev = float("inf")
for mult in (1, 2, 8, 64, 512, 4096, 65536):
t = read_time_s(N, SAMPLE, chunk_bytes=SAMPLE * mult, seq_bw=SEQ_BW,
rand_bw=RAND_BW, per_read_overhead_s=OVH)
assert t <= prev + 1e-12, (mult, t, prev)
prev = t
# adversarial: with ZERO per-read overhead and equal bandwidth, layout must not matter
# (isolates that the win comes from overhead + bandwidth, not a bookkeeping bug).
flat_a = read_time_s(N, SAMPLE, SAMPLE * 100, seq_bw=SEQ_BW, rand_bw=SEQ_BW,
per_read_overhead_s=0.0)
flat_b = read_time_s(N, SAMPLE, SAMPLE, seq_bw=SEQ_BW, rand_bw=SEQ_BW,
per_read_overhead_s=0.0)
assert abs(flat_a - flat_b) < 1e-9
print(f"scatter={scatter:.1f}s shard={shard:.1f}s speedup={scatter/shard:.0f}x "
f"1MB-vs-4KB overhead/sample={tiny/big:.0f}x fewer reads; asserts pass")
Running it prints scatter=225.0s shard=16.7s speedup=13x 1MB-vs-4KB overhead/sample=256x fewer reads; asserts pass and every assert passes. The speedup magnitude depends on the medium's overhead and random/sequential bandwidth gap, but the direction is unconditional whenever a per-read overhead exists, which is the whole point of the small-file rule.
WebDataset (tar shards)¶
WebDataset packs samples into standard POSIX tar shards, named with brace notation, and is a plain PyTorch IterableDataset, drop-in with the standard DataLoader. It enables "purely sequential I/O pipelines for large scale deep learning"; shard-level splitting across workers/ranks (via get_worker_info) keeps reads sequential within each shard. The project reports an ~11% pipeline/throughput gain purely from switching to a sharded, sequentially readable format.4
Reference template (needs webdataset, not installed here; the sequential-read advantage it exploits is the model validated just above):
# REFERENCE TEMPLATE (requires the webdataset package; not runnable here).
import webdataset as wds
# 12346 shards: shards-000000.tar .. shards-012345.tar
url = "pipe:aws s3 cp s3://bucket/shards-{000000..012345}.tar -"
dataset = (
wds.WebDataset(url, shardshuffle=100) # shuffle shard order; cheap, sequential reads within a shard
.shuffle(1000) # sample-level buffer shuffle
.decode("rgb") # decode images
.to_tuple("jpg", "cls") # (image, label)
)
loader = wds.WebLoader(dataset, batch_size=32, num_workers=8, pin_memory=True)
Parquet / Arrow, and multi-node sharding¶
Parquet/Arrow are the columnar analogue for tabular/text and the on-disk form for many LLM corpora (the book notes NeMo Curator reads/writes sharded JSONL or Parquet, later converted to memory-mappable .bin/.idx). They also carry compression, trading I/O bandwidth for decode CPU/GPU, favourable only when I/O is the bottleneck and the decoder does not itself become one.1
For multi-node training, shard the dataset across nodes so each reads from local storage, and use PyTorch's DistributedSampler so each rank gets a unique slice per epoch.1 See Storage and data platform for striping (Lustre OSTs), mount tuning (noatime, rsize/wsize), and object-store staging.
Offload decode to the GPU (DALI) or curate offline (NeMo)¶
When the host is the wall (the book's image/video case at "800% CPU" with the GPU still stalling), move decode/augment onto the GPU. NVIDIA DALI decodes JPEG and applies crop/resize/normalize on the GPU via a declaratively defined static operator graph, with its own prefetch and threading; the book's example drops CPU from ~800% to ~200% by letting the GPU do the decode.15 See NVIDIA DALI: GPU data loading and augmentation.
Caveat the book stresses: if you use DALI only to JPEG-decode and then hand pixels back to the CPU for augmentation/collation, the extra host to device to host copies can negate the gain. Keep the whole preprocessing chain on the GPU, or fuse it into your model's preprocessing graph (TorchVision/TensorRT/custom kernels).1 Always benchmark CPU-only vs DALI vs fully-fused GPU graph end-to-end.
Better still, do the heavy lifting offline: "In general, prepare your data before training. You should almost never be training with raw text." Tools like NeMo Curator cleanse, tokenize, dedup, shuffle, and pack into a small number of large files ahead of time, leaving the online loader to do cheap last-mile work.1 One option to kill runtime shuffle cost: store N pre-shuffled copies, trading disk for per-epoch CPU.
How to diagnose input starvation on the profiler timeline¶
The decisive question: is the GPU idle because of (a) the Python/worker pipeline, or (b) the H2D copy, or (c) neither (it is compute/comms-bound)? Separate them. Do not add workers blindly.
Measure total wait. In PyTorch, next(data_iterator) measures total GPU-idle time waiting for the next batch, including background prefetch and the H2D copy, not just the Python logic.1
Reference template (needs torch):
# REFERENCE TEMPLATE (requires torch + CUDA; not runnable in this environment).
import time
import torch
it = iter(loader)
events = []
for step in range(num_steps):
t0 = time.perf_counter()
batch = next(it) # total stall: prefetch queue + H2D
wait_s = time.perf_counter() - t0
batch = batch.to("cuda", non_blocking=True)
# forward / backward / step ...
events.append(wait_s)
# Mean / p99 wait vs your step time => fraction of GPU time lost to input.
Split the two costs:1
- DataLoader vs Python cost: re-run with
num_workers=0(no background prefetch) and time only the iterator pull. This isolates how long the Python loop + transforms take. - Host to device copy cost: wrap the
.to("cuda")intorch.cuda.Eventtimers, or read the "Copy" lanes in Nsight Systems, to quantify pure H2D stall.
Reference template (needs torch):
# REFERENCE TEMPLATE (requires torch + CUDA; not runnable in this environment).
start, end = torch.cuda.Event(enable_timing=True), torch.cuda.Event(enable_timing=True)
start.record()
batch_gpu = batch.to("cuda", non_blocking=True)
end.record()
torch.cuda.synchronize()
h2d_ms = start.elapsed_time(end) # milliseconds spent in the H2D copy alone
Read the timeline. In Nsight Systems, the starvation signature is GPU rows idle at iteration boundaries with gaps that line up against the loader, not against NCCL/all-reduce waits (those are a comms bottleneck; see Communication-computation overlap). The book: "If they are idle at the start of each iteration, perhaps data loading ... is the bottleneck."1 Host OS tools (iostat, iotop, nvme-cli) and DCGM complete the picture of whether you are saturating disk/NIC.1 See GPU diagnostics and validation, Observability and monitoring.
Act on the verdict:
- Loader (Python) slow: remove per-element logging, vectorize/batch transforms via
collate_fn, use Rust/C++ tokenizers (HF Tokenizers, TorchText), add workers, or move decode to the GPU (DALI). - H2D slow: confirm
pin_memory+non_blocking, widen the interconnect, or use GDS to bypass the host bounce buffer entirely. - Neither: it is compute- or comms-bound; this page is not your lever (roofline).
flowchart TD
GPUIDLE["GPU idle: compute timeline gaps"] --> WHERE{"Gap aligns with what?"}
WHERE -->|"Iteration start, loader queue"| INPUT["Input starvation"]
WHERE -->|"NCCL / all-reduce wait"| COMMS["Comms-bound (not this page)"]
WHERE -->|"None: GPU busy, low FLOPS"| COMPUTE["Compute / memory-bound"]
INPUT --> SPLIT{"num_workers=0 vs H2D timer"}
SPLIT -->|"Python loop slow"| FIXPY["Vectorize, C/C++ tokenizers, more workers, DALI"]
SPLIT -->|"H2D copy slow"| FIXCOPY["pin_memory + non_blocking, wider link, GDS"]
FIXPY --> SHARD["Shard layout: WebDataset / Parquet, kill small files"]
FIXCOPY --> SHARD
How to maintain it¶
Pipeline tuning is not one-shot. Hardware/library updates shift the optimum. Track samples/sec on a dashboard, run nightly profiled jobs to catch regressions, and re-baseline after upgrading PyTorch/CUDA/NCCL. The book's loop: run, then measure, then tune, then repeat, changing one or two things at a time so you know what helped, and documenting cluster-specific findings in code/config.1 For CI guardrails on throughput, see PyTorch performance regression testing in CI and Performance optimization and tuning.
How to scale it¶
Scaling out compute without widening the pipeline shifts the bottleneck onto data loading (the first force in ## Why use it): each data-parallel rank reads a distinct shard, so aggregate loader demand grows with cluster size, and faster GPUs raise the per-GPU byte rate.1 The scaling question is how much throughput you recover by cutting the input-wait fraction, and it is easy to misquote. Reducing GPU wait from 30% to 5% is a 6x reduction in wasted time (30/5), but the resulting throughput multiplier is smaller: compute per step is unchanged, so steps/sec rise only by (1 - 0.05) / (1 - 0.30) = 1.357x. This model separates the two readings so neither is overstated, checks the boundary as wait approaches total starvation, rejects invalid fractions, and asserts the speedup is the exact inverse of the reverse move over 500 random pairs.
# Input-starvation speedup model, validated (system python3, numpy). Run: python3 starvation.py
import numpy as np
def steps_per_sec(compute_s, wait_fraction):
"""Training throughput when the GPU spends `wait_fraction` of each iteration
stalled on input. iter_time = compute / (1 - wait_fraction); rate = 1/iter_time.
compute_s is the pure compute time of one step (constant across tunings)."""
assert compute_s > 0.0
assert 0.0 <= wait_fraction < 1.0, "wait fraction must be in [0,1)"
iter_time = compute_s / (1.0 - wait_fraction)
return 1.0 / iter_time
def speedup(from_wait, to_wait):
"""Throughput multiplier from reducing the input-wait fraction from_wait->to_wait.
Compute per step is unchanged, so the ratio of rates is (1-to)/(1-from)."""
assert 0.0 <= from_wait < 1.0 and 0.0 <= to_wait < 1.0
return (1.0 - to_wait) / (1.0 - from_wait)
# happy path: the book's worked example, 30% wait tuned down to 5% wait.
s_before = steps_per_sec(compute_s=0.10, wait_fraction=0.30)
s_after = steps_per_sec(compute_s=0.10, wait_fraction=0.05)
ratio = s_after / s_before
# 0.95/0.70 = 1.357..., i.e. ~1.36x more steps/sec from the SAME compute.
assert abs(ratio - (0.95 / 0.70)) < 1e-12
assert abs(ratio - 1.3571428571428572) < 1e-9, ratio
assert abs(speedup(0.30, 0.05) - ratio) < 1e-12 # speedup() agrees with rate ratio
# the page's headline 6x is the reduction in WASTED time (30% -> 5% = 6x less idle),
# NOT the throughput multiplier. Assert both readings so neither is misquoted.
assert abs(0.30 / 0.05 - 6.0) < 1e-12 # 6x less wasted GPU time
assert ratio < 1.4 # throughput gain is ~1.36x, not 6x
# adversarial edge: wait_fraction -> 1 (total starvation) drives throughput -> 0.
assert steps_per_sec(0.10, 0.99) < steps_per_sec(0.10, 0.30)
assert np.isclose(steps_per_sec(0.10, 0.0), 10.0) # no wait => 1/0.1 = 10 steps/s
# fraction must be a valid probability; reject nonsense (corruption detection).
for bad in (-0.1, 1.0, 1.5):
try:
steps_per_sec(0.10, bad)
raise AssertionError(f"should have rejected wait_fraction={bad}")
except AssertionError as e:
assert "wait fraction" in str(e)
# adversarial: speedup is the exact inverse of the reverse move (round-trip == 1).
rng = np.random.default_rng(5)
for _ in range(500):
a, b = float(rng.uniform(0, 0.95)), float(rng.uniform(0, 0.95))
assert abs(speedup(a, b) * speedup(b, a) - 1.0) < 1e-9
# the 50%-wasted claim: fully-fed vs half-idle is a 2x throughput span.
assert abs(speedup(0.50, 0.0) - 2.0) < 1e-12
print(f"throughput 30%->5% wait = {ratio:.4f}x; wasted-time reduction = "
f"{0.30/0.05:.0f}x; fed vs 50%-idle = {speedup(0.5,0.0):.1f}x; all asserts pass")
Running it prints throughput 30%->5% wait = 1.3571x; wasted-time reduction = 6x; fed vs 50%-idle = 2.0x; all asserts pass and every assert passes. Note the failing assert is deliberate: steps_per_sec rejects a wait_fraction of 1.0 or above (division by zero / negative time) and the test confirms the guard fires, so a corrupted profiler reading cannot silently produce a nonsense throughput. As you scale ranks, the lever is keeping each rank's wait fraction low (more workers, sharded local reads, offloaded decode); the model bounds what any such tuning can return.
Failure modes¶
- Adding workers to a device-bound pipeline. If the wall is the H2D copy or storage bandwidth, more
num_workersonly adds CPU contention and buys nothing. Split the cost withnum_workers=0versus atorch.cuda.EventH2D timer first; the diagnosis section is the guard.1 num_workers=0in a real run. With zero workers there is no background prefetch and the loop blocks on every batch; the queue-depth model's steady-state wait applies withQ=0. Use it only to isolate Python-loop cost, never in production.3- Pinning without lifting
ulimit -l. Large pinned buffers fail to allocate or fall back to swappable memory when the locked-memory cap is too low, silently erasing the pinned-copy win; setmemlockunlimited in the shell / container / pod.1 - Custom batch type with no
pin_memory()method. The loader's default pinning logic silently skips a batch type that is not a tensor or standard collection, sonon_blocking=Truecopies are no longer truly async. Define the method.3 - Scattered small files. A directory of millions of tiny files triggers a metadata storm (per-file
open/stat) and random seeks; the read-time model shows the strictly worse layout. Pack into shards.1 - DALI decode that bounces back to the CPU. Using DALI only for JPEG decode and then returning pixels to the CPU for augmentation adds host to device to host copies that can negate the offload gain. Keep the whole preprocess chain on the GPU and benchmark end-to-end.1
- Reading
time_loaderas the ceiling. Disabling GPU kernels also removes kernel-launch overhead, so isolated loader throughput reads faster than a real run; treat it as a lower bound on iteration time, not the achievable rate.1 - Compression that moves the bottleneck. Parquet/columnar compression trades I/O bandwidth for decode CPU/GPU; on an already compute-bound loop it makes things worse. It helps only when I/O is the wall and the decoder does not itself saturate.1
- Assuming starvation without profiling. A gap at iteration start can be input starvation, an NCCL/all-reduce wait, or compute/memory-bound execution; acting on the wrong one wastes effort. Read the timeline and classify first.1
References¶
Related: Storage and data platform · GPUDirect Storage · NVIDIA DALI: GPU data loading and augmentation · NUMA affinity and CPU pinning for GPU pipelines · Profiling GPUs: Nsight Systems and Nsight Compute · Communication-computation overlap · Distributed training recipes · Glossary
-
Chris Fregly, AI Systems Performance Engineering (O'Reilly), Chapter 5, "GPU-Based Storage I/O Optimizations": sections "Efficient Data Loading and Preprocessing", "Scaling Out Workers as You Scale Out Number of GPUs", "Multimodal Data Processing with NVIDIA DALI", "Creating High-Quality LLM Datasets with NVIDIA NeMo Curator", "Monitoring Storage I/O", and "Sequential Versus Random Read Patterns". Source of the
num_workers/prefetch_factor/pin_memory/persistent_workersguidance, the 30% to 5% (6x wasted-time reduction) starvation example, the 50%-wasted-GPU-time claim, the small-file/sharding advice, the DALI 800% to 200% CPU example, and thenext(data_iterator)/num_workers=0/ Copy-lane diagnosis method. ↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩↩ -
Chris Fregly, AI Systems Performance Engineering (O'Reilly), Chapter 3, "OS, Docker, and Kubernetes Tuning for GPU-Based Environments": section "NUMA-Friendly Memory Allocation and Memory Pinning". Source of the pinned-memory 2 to 3x DMA and 10% to 20% end-to-end claims and the
ulimit -lguidance. ↩ -
PyTorch,
torch.utils.data.DataLoaderdocumentation: exact defaults/semantics fornum_workers(default0),prefetch_factor(default2whennum_workers>0; total prefetched =2 * num_workers),pin_memory,persistent_workers(defaultFalse), and thepin_memory()method requirement for custom batch types. https://docs.pytorch.org/docs/stable/data.html ↩↩↩↩↩↩↩↩ -
WebDataset:
tar-shard format, brace-notation shard URLs,IterableDatasetcompatibility with the standardDataLoader, shard-level splitting viaget_worker_info, and the ~11% throughput gain from sequential sharded reads. https://github.com/webdataset/webdataset ↩ -
NVIDIA DALI documentation: GPU-side JPEG decode and augmentation, declarative pipeline graph, internal prefetch/threading. https://docs.nvidia.com/deeplearning/dali/user-guide/docs/ ↩