Ray for GPU clusters¶
Scope: Ray as a Python-native distributed runtime for GPU clusters: tasks, actors, the head/worker model, and the Train/Serve/Data/RLlib libraries that make it the control plane for LLM RL.
The Ray, PyTorch, and vLLM snippets are reference templates on real APIs (those packages are not imported here); pin versions and validate before production use. The two resource-model examples (fractional-GPU scheduling and placement-group feasibility) are numpy-only, executed, and asserted, so the core scheduling math the page teaches is checked rather than asserted by hand.
What it is¶
Ray is an open-source distributed runtime (ray-project/ray) that runs Python code across a cluster as tasks (stateless functions) and actors (stateful classes). A head node runs the Global Control Store (GCS: cluster metadata, actor registry) and the scheduler; worker nodes run the tasks and actors, with GPUs exposed as logical resources. On top sit four libraries: Ray Train (distributed training, wraps torch/FSDP), Ray Serve (model serving, inference serving), Ray Data (streaming data), and RLlib (classic RL). As of mid-2026 the stable line is Ray 2.55.x with a 3.0 dev line in master; verify the exact tag on docs.ray.io.
Why use it¶
- Python-native: distribute existing Python by decorating with
@ray.remote; no separate cluster DSL. - Unifies train / serve / data under one runtime and one resource model, so a pipeline does not stitch three schedulers together.
- The controller for RL libraries: verl, slime, SkyRL, NeMo-RL and others use Ray actors to hold a long-lived rollout engine and a long-lived trainer as separate, coordinated actor groups that exchange weights and rollouts (RL libraries).
When to use it (and when not)¶
Ray is a runtime, not a cluster manager. It runs ON Kubernetes (Kubernetes) via KubeRay or ON Slurm (Slurm), or on raw VMs. Compared in orchestration overview.
- Use Ray for RL post-training, Python-native distributed pipelines, multi-model serving, and heterogeneous rollout-vs-train topologies.
- Prefer plain Kubernetes (Kubernetes) when the workload is a containerised microservice with no distributed-Python coordination need.
- Prefer plain Slurm (Slurm) for tightly-coupled MPI/
torchrunpretraining on bare metal, though Ray-on-Slurm is a supported pattern for actor workloads on an HPC cluster.
Architecture¶
flowchart TB
subgraph Head["Ray head node"]
GCS["GCS (global control store)"]
SCHED["Scheduler"]
end
subgraph Libs["Ray libraries"]
TR["Ray Train"]
SV["Ray Serve"]
DT["Ray Data"]
RL["RLlib"]
end
Head --> W1["Worker (GPU): actors + tasks"]
Head --> W2["Worker (GPU): actors + tasks"]
W1 --- W2
Libs -.->|"submit to"| Head
How to use it¶
A task is a remote function; an actor is a remote class. GPUs are requested via num_gpus. Reference template (needs a running Ray):
import ray
ray.init() # connects to a running cluster, or starts a local one
@ray.remote(num_gpus=1) # stateless task on 1 GPU
def embed(batch: list[str]) -> int:
return len(batch) # real work runs a model on the GPU
@ray.remote(num_gpus=1) # stateful actor holding a model on 1 GPU
class Policy:
def act(self, obs: str) -> int:
return len(obs)
futures = [embed.remote(["a", "b"]) for _ in range(4)]
print(ray.get(futures)) # gather task results
policy = Policy.remote() # long-lived actor handle
print(ray.get(policy.act.remote("hello")))
What num_gpus actually drives is Ray's resource-accounting scheduler: each node advertises a GPU capacity, a request reserves a (possibly fractional) slice, tasks that do not fit stay PENDING, and one task is never split across hosts. The scheduler is a greedy pack over free capacity. This numpy model reproduces that admission logic and asserts it, including fractional packing, rejection of an over-sized request, and equivalence to a slow brute-force reference over 2000 random instances:
# ray_resources.py -- numpy model of Ray's fractional-GPU resource accounting and
# greedy bin-pack admission (the num_gpus request model). Executed and asserted:
# fractional packing, over-subscription rejection, and a slow-reference equivalence.
import numpy as np
def schedule(free_gpu, reqs): # free_gpu[i]=GPU left on node i; reqs=list of num_gpus
free = free_gpu.astype(np.float64).copy()
place = np.full(len(reqs), -1, np.int64) # -1 = unschedulable (task stays PENDING)
for k, need in enumerate(reqs):
fits = np.flatnonzero(free + 1e-9 >= need) # first node with capacity (Ray packs by default)
if fits.size:
n = fits[0]; free[n] -= need; place[k] = n
return place, free
# 2 nodes x 1 GPU each. Four 0.5-GPU actors pack two-per-GPU; a 5th 0.5 has no room.
free0 = np.array([1.0, 1.0])
place, free = schedule(free0, [0.5, 0.5, 0.5, 0.5, 0.5])
assert (place == np.array([0, 0, 1, 1, -1])).all() # fractional packing, exact placement
assert np.isclose(free, 0.0).all() # both GPUs fully booked
assert (place[:4] >= 0).all() and place[4] == -1 # 5th is unschedulable, not silently dropped
# conservation: GPU consumed by placed actors exactly equals free-capacity drop,
# and never exceeds total capacity (no over-subscription).
placed = place >= 0
demand_used = np.full(5, 0.5)[placed].sum()
assert np.isclose(demand_used, free0.sum() - free.sum()) # bookkeeping is exact
assert demand_used <= free0.sum() + 1e-9 # capacity is never exceeded
# adversarial 1: a request larger than any single node's free GPU must be rejected,
# NOT split across nodes (Ray never fragments one task across hosts).
p2, _ = schedule(np.array([1.0, 1.0]), [1.5])
assert p2[0] == -1, "a 1.5-GPU task on 1-GPU nodes must stay pending, never split"
# adversarial 2: equivalence to a slow brute-force reference on random instances.
def slow_ref(free_gpu, reqs): # explicit loop, no numpy vectorization
free = list(map(float, free_gpu)); out = []
for need in reqs:
chosen = -1
for i in range(len(free)):
if free[i] + 1e-9 >= need:
chosen = i; free[i] -= need; break
out.append(chosen)
return out
rng = np.random.default_rng(7)
for _ in range(2000):
fg = rng.integers(0, 5, size=rng.integers(1, 6)).astype(float)
rq = (rng.integers(1, 9, size=rng.integers(0, 12)) / 4.0).tolist() # 0.25..2.0 GPUs
fast, _ = schedule(fg, rq)
assert fast.tolist() == slow_ref(fg, rq), (fg, rq)
print("resources OK: fractional pack + reject over-request + 2000-case slow-ref match")
How to develop with it¶
Ray Train wraps a PyTorch loop in a TorchTrainer; ScalingConfig sets worker count and GPU use; Ray sets up the distributed (NCCL) process group per worker. Reference template (needs Ray plus PyTorch):
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
def train_loop_per_worker(config: dict) -> None:
import ray.train.torch
model = build_model()
model = ray.train.torch.prepare_model(model) # wraps in DDP/device move
for batch in get_loader():
loss = step(model, batch)
ray.train.report({"loss": float(loss)})
trainer = TorchTrainer(
train_loop_per_worker=train_loop_per_worker,
scaling_config=ScalingConfig(num_workers=8, use_gpu=True),
)
trainer.fit()
How to scale it¶
Ray scales by adding worker nodes (manual or via the autoscaler) and by pinning groups of work to nodes with placement groups.
- Autoscaling: on Kubernetes, a
RayClusterwithenableInTreeAutoscalinggrows/shrinks worker pods on demand; the autoscaler reads pending tasks/actors. - Multi-node: Ray Train spans nodes transparently; one worker per GPU, NCCL across nodes (needs RDMA, see below).
- Placement groups reserve bundles of resources with a strategy:
STRICT_PACKco-locates a tightly-coupled group on one node (intra-node NVLink);STRICT_SPREADforces one-per-node for a NCCL group spread across nodes.
Reference template (needs a running Ray):
from ray.util.placement_group import placement_group
# 8 GPUs packed onto one node (exploit NVLink for an in-node collective)
pg = placement_group(bundles=[{"GPU": 1, "CPU": 4}] * 8, strategy="STRICT_PACK")
ray.get(pg.ready())
The two strategies encode different feasibility constraints, and getting them backwards is a real failure (below): STRICT_PACK needs one node large enough for the whole group, STRICT_SPREAD needs one distinct node per bundle. This numpy model asserts both, including the boundary where demand exactly equals a node's capacity and the case where a topology that satisfies one strategy provably fails the other. strict_spread_ok is a greedy sort-and-match; the block also asserts it equals an exhaustive one-bundle-per-node search over 3000 random instances:
# placement_group.py -- numpy model of STRICT_PACK vs STRICT_SPREAD feasibility.
# STRICT_PACK: all bundles must fit on ONE node (intra-node NVLink group).
# STRICT_SPREAD: at most one bundle per node (one-rank-per-node NCCL group).
# Executed and asserted, including the boundary case where each strategy flips.
import numpy as np
def strict_pack_ok(node_free_gpu, bundle_gpu): # every bundle co-located on a single node
total = float(np.sum(bundle_gpu))
return bool(np.any(node_free_gpu + 1e-9 >= total)) # some node holds the whole group
def strict_spread_ok(node_free_gpu, bundle_gpu): # one bundle per distinct node
b = np.sort(bundle_gpu)[::-1] # largest bundles first
free = np.sort(node_free_gpu)[::-1].astype(float).copy()
if b.size > free.size: # cannot place more bundles than nodes
return False
for i, need in enumerate(b): # greedy: i-th largest bundle -> i-th freest node
if free[i] + 1e-9 < need:
return False
return True
# 8-GPU NVLink group as 8 x 1-GPU bundles. PACK needs one 8-GPU node.
bundles = np.ones(8)
assert strict_pack_ok(np.array([8.0, 2.0]), bundles) is True # the 8-GPU node holds it
assert strict_pack_ok(np.array([4.0, 4.0]), bundles) is False # 4+4 split cannot satisfy STRICT_PACK
# SPREAD of the same 8 bundles needs 8 distinct nodes with >=1 GPU each.
assert strict_spread_ok(np.ones(8), bundles) is True # 8 nodes x 1 GPU: one bundle each
assert strict_spread_ok(np.ones(7), bundles) is False # only 7 nodes: 8th bundle unplaceable
# boundary: total demand exactly equals a node's capacity -> PACK just fits.
assert strict_pack_ok(np.array([8.0]), np.ones(8)) is True # 8 == 8, packs exactly
assert strict_pack_ok(np.array([7.99]), np.ones(8)) is False # one GPU short -> infeasible
# adversarial: PACK and SPREAD are genuinely different constraints, not interchangeable.
# 8 GPUs on a single node satisfies PACK but FAILS SPREAD (only one node available).
one_fat_node = np.array([8.0])
assert strict_pack_ok(one_fat_node, bundles) is True
assert strict_spread_ok(one_fat_node, bundles) is False
# 8 single-GPU nodes satisfies SPREAD but FAILS PACK (no node holds all 8).
eight_thin = np.ones(8)
assert strict_spread_ok(eight_thin, bundles) is True
assert strict_pack_ok(eight_thin, bundles) is False
# adversarial 2: heterogeneous bundles -- SPREAD must match the biggest bundle to a big-enough node.
hetero = np.array([4.0, 1.0, 1.0]) # one 4-GPU bundle + two 1-GPU
assert strict_spread_ok(np.array([4.0, 1.0, 1.0]), hetero) is True # 4->4node, 1->1, 1->1
assert strict_spread_ok(np.array([2.0, 2.0, 2.0]), hetero) is False # no node fits the 4-GPU bundle
# adversarial 3: the sort-and-match greedy equals an exhaustive one-bundle-per-node search.
from itertools import permutations
def spread_bruteforce(node_free, bundle): # true iff SOME injective bundle->node assignment fits
if len(bundle) > len(node_free):
return False
for perm in permutations(range(len(node_free)), len(bundle)):
if all(node_free[perm[k]] + 1e-9 >= bundle[k] for k in range(len(bundle))):
return True
return False
rng = np.random.default_rng(11)
for _ in range(3000):
nf = rng.integers(0, 5, size=rng.integers(1, 6)).astype(float)
bd = rng.integers(1, 5, size=rng.integers(1, 5)).astype(float)
assert strict_spread_ok(nf, bd) == spread_bruteforce(nf, bd), (nf, bd)
print("placement groups OK: PACK vs SPREAD feasibility, boundary, non-interchangeability, 3000-case brute-force match")
How to serve inference with it¶
Ray Serve is the serving library (inference serving). Ray Serve LLM builds an OpenAI-compatible app backed by vLLM, with autoscaling and multi-model routing; most engine_kwargs that work with vllm serve pass through. Reference template (needs Ray Serve plus vLLM):
from ray.serve.llm import LLMConfig, build_openai_app
llm = LLMConfig(
model_loading_config={"model_id": "qwen3-8b",
"model_source": "Qwen/Qwen3-8B"},
engine_kwargs={"tensor_parallel_size": 2, "max_model_len": 32768},
)
app = build_openai_app({"llm_configs": [llm]}) # serve.run(app) to deploy
How to integrate with it¶
Ray is the integration substrate three ways: as a controller under RL post-training libraries, as a workload on a cluster manager, and as a fine-tuning runner.
- As the controller for RL libraries. verl, slime, SkyRL, NeMo-RL, and OpenRLHF all use Ray actors to hold the vLLM/SGLang rollout engine and the FSDP/Megatron trainer as separate, coordinated actor groups, shuttling weights between them (RL libraries, fine-tuning and post-training, GRPO, DPO). The delta-sync path between those actor groups is its own topic (delta weight sync).
- As a workload on a cluster manager. Ray runs ON Kubernetes via KubeRay (the
RayCluster/RayJob/RayServiceCRDs, Kubernetes) or ON Slurm via the Ray-on-Slurm launch pattern (Slurm), so the cluster scheduler and GPU Operator keep governing quota and topology. Prefer KubeRay to a hand-rolled second stack (orchestration overview). - As a fine-tuning runner. Ray Train runs the SFT/LoRA loop (SFT and LoRA) directly as a distributed
TorchTrainer(the develop-section template above), the non-RL path to the same GPUs.
How to run it on optimised hardware¶
- RDMA into workers: expose IB/RoCE NICs to Ray worker pods/nodes so NCCL uses GPUDirect RDMA, not TCP. On KubeRay, request the RDMA resource in the worker container (e.g.
rdma/rdma_shared_device_a), exactly as for any pod (Kubernetes for GPUs). - NCCL: Ray Train collectives run over NCCL. Set
NCCL_IB_HCA,NCCL_NET_GDR_LEVEL=SYS, optionallyNCCL_NVLS_ENABLEfor NVLink SHARP; confirm[GDRDMA]inNCCL_DEBUG=INFO(performance tuning). ACS must be off for P2P/GDR. - Placement groups for topology:
STRICT_PACKto keep a collective inside one node's NVLink/NVSwitch domain;STRICT_SPREADfor one-rank-per-node. Bundles are not ordered by physical GPU rank, so pin device/affinity inside the worker, not by bundle index (the Blackwell platform).
How to run it in production¶
On Kubernetes, run Ray through the KubeRay CRDs so the cluster scheduler and GPU Operator keep governing it, and pick the CRD by lifecycle:
RayJobfor run-to-completion training (the cookbook recipe below):shutdownAfterJobFinishes: truetears the cluster down when the job exits, so a finished run frees its GPUs.RayServicefor online serving: it adds high availability and zero-downtime rolling upgrades in front of a Ray Serve app, which a bareRayClusterdoes not.- GCS fault tolerance: the head node's GCS is a single point of failure, so back it with external Redis; then a head restart recovers cluster state instead of dropping every actor.
RayServicewires this in for serving. - Pin the image and expose RDMA at the pod: pin an explicit
rayproject/ray:<ver>-gputag on head and workers (mismatched Ray versions across nodes fail to form a cluster), and request the RDMA resource in the worker container so NCCL gets GPUDirect RDMA rather than TCP.
How to maintain it¶
- Pin the version line. As of mid-2026 the stable line is Ray 2.55.x with a 3.0 dev line in
master; verify the exact tag on docs.ray.io and keep head, workers, and the KubeRay operator in lockstep. For RL, treat the Ray build and the inference-engine build as one versioned unit. - Size the object store and watch spill. Large objects evicted from the in-memory object store to disk stall pipelines; set
object_store_memorydeliberately and alert on spill metrics rather than discovering spill through a throughput cliff. - Watch the head and GCS. Monitor head-node liveness and the GCS-backing Redis; a head loss without GCS fault tolerance drops the cluster.
- Re-verify the fabric after changes. After any node, image, or NCCL-env change, re-confirm
[GDRDMA]inNCCL_DEBUG=INFOfrom inside a worker and re-runnccl-tests, because a silent TCP fallback craters throughput without erroring (workload recipes).
Cookbook (common use cases)¶
Reference templates (each needs a running Ray plus the noted library); the resource math they rely on is the validated numpy above.
1. Ray Data batch GPU inference (offline scoring across workers):
import ray
class Scorer: # actor-style stateful UDF
def __init__(self) -> None: self.model = load_model() # once per worker
def __call__(self, batch: dict) -> dict:
batch["score"] = self.model(batch["text"]); return batch
ds = ray.data.read_parquet("s3://bucket/prompts/")
ds.map_batches(Scorer, concurrency=8, num_gpus=1).write_parquet("s3://bucket/out/")
2. Ray Serve LLM deployment (run the build_openai_app result from the serve-inference section):
3. KubeRay RayJob (run-to-completion on Kubernetes Kubernetes):
apiVersion: ray.io/v1
kind: RayJob
metadata: { name: rl-train, namespace: ml }
spec:
entrypoint: python train_rl.py
shutdownAfterJobFinishes: true
rayClusterSpec:
headGroupSpec:
template:
spec: { containers: [{ name: ray-head, image: rayproject/ray:2.55.1-py311-gpu }] }
workerGroupSpecs:
- groupName: gpu-workers
replicas: 8
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray:2.55.1-py311-gpu
resources:
limits: { nvidia.com/gpu: 8, rdma/rdma_shared_device_a: 1 }
Failure modes¶
- Workers without RDMA → Ray Train NCCL silently falls back to TCP; throughput craters. Validate with
nccl-testsinside a worker (workload recipes). - Ungoverned second stack: running Ray beside Kubernetes without KubeRay duplicates scheduling and bypasses quota; prefer KubeRay so the cluster scheduler and GPU Operator govern Ray pods (orchestration overview).
- GCS as a single point: a head-node loss drops the cluster unless GCS fault tolerance (external Redis) is configured;
RayServiceadds HA for serving. - Bundle ≠ physical GPU order: never assume bundle index maps to GPU rank for NVLink locality; set device affinity in the worker.
- Object store spill: large objects evicted to disk stall pipelines; size
object_store_memoryand watch spill metrics.
References¶
- Ray docs: https://docs.ray.io/en/latest/
- KubeRay: https://docs.ray.io/en/latest/cluster/kubernetes/index.html · repo https://github.com/ray-project/kuberay
- Ray Train (PyTorch): https://docs.ray.io/en/latest/train/getting-started-pytorch.html ·
TorchTrainerhttps://docs.ray.io/en/latest/train/api/doc/ray.train.torch.TorchTrainer.html - Ray Serve LLM: https://docs.ray.io/en/latest/serve/index.html ·
build_openai_apphttps://docs.ray.io/en/latest/serve/api/doc/ray.serve.llm.build_openai_app.html - Placement groups: https://docs.ray.io/en/latest/ray-core/scheduling/placement-group.html
- Ray on Slurm: https://docs.ray.io/en/latest/cluster/vms/user-guides/community/slurm.html
Related: Inference · Orchestration · RL Libraries · Kubernetes · Slurm · SFT/LoRA · Glossary