Skip to content
Markdown

Ray for GPU clusters

Scope: Ray as a Python-native distributed runtime for GPU clusters: tasks, actors, the head/worker model, and the Train/Serve/Data/RLlib libraries that make it the control plane for LLM RL.

The Ray, PyTorch, and vLLM snippets are reference templates on real APIs (those packages are not imported here); pin versions and validate before production use. The two resource-model examples (fractional-GPU scheduling and placement-group feasibility) are numpy-only, executed, and asserted, so the core scheduling math the page teaches is checked rather than asserted by hand.

What it is

Ray is an open-source distributed runtime (ray-project/ray) that runs Python code across a cluster as tasks (stateless functions) and actors (stateful classes). A head node runs the Global Control Store (GCS: cluster metadata, actor registry) and the scheduler; worker nodes run the tasks and actors, with GPUs exposed as logical resources. On top sit four libraries: Ray Train (distributed training, wraps torch/FSDP), Ray Serve (model serving, inference serving), Ray Data (streaming data), and RLlib (classic RL). As of mid-2026 the stable line is Ray 2.55.x with a 3.0 dev line in master; verify the exact tag on docs.ray.io.

Why use it

  • Python-native: distribute existing Python by decorating with @ray.remote; no separate cluster DSL.
  • Unifies train / serve / data under one runtime and one resource model, so a pipeline does not stitch three schedulers together.
  • The controller for RL libraries: verl, slime, SkyRL, NeMo-RL and others use Ray actors to hold a long-lived rollout engine and a long-lived trainer as separate, coordinated actor groups that exchange weights and rollouts (RL libraries).

When to use it (and when not)

Ray is a runtime, not a cluster manager. It runs ON Kubernetes (Kubernetes) via KubeRay or ON Slurm (Slurm), or on raw VMs. Compared in orchestration overview.

  • Use Ray for RL post-training, Python-native distributed pipelines, multi-model serving, and heterogeneous rollout-vs-train topologies.
  • Prefer plain Kubernetes (Kubernetes) when the workload is a containerised microservice with no distributed-Python coordination need.
  • Prefer plain Slurm (Slurm) for tightly-coupled MPI/torchrun pretraining on bare metal, though Ray-on-Slurm is a supported pattern for actor workloads on an HPC cluster.

Architecture

flowchart TB
  subgraph Head["Ray head node"]
    GCS["GCS (global control store)"]
    SCHED["Scheduler"]
  end
  subgraph Libs["Ray libraries"]
    TR["Ray Train"]
    SV["Ray Serve"]
    DT["Ray Data"]
    RL["RLlib"]
  end
  Head --> W1["Worker (GPU): actors + tasks"]
  Head --> W2["Worker (GPU): actors + tasks"]
  W1 --- W2
  Libs -.->|"submit to"| Head

How to use it

A task is a remote function; an actor is a remote class. GPUs are requested via num_gpus. Reference template (needs a running Ray):

import ray
ray.init()  # connects to a running cluster, or starts a local one

@ray.remote(num_gpus=1)               # stateless task on 1 GPU
def embed(batch: list[str]) -> int:
    return len(batch)                 # real work runs a model on the GPU

@ray.remote(num_gpus=1)               # stateful actor holding a model on 1 GPU
class Policy:
    def act(self, obs: str) -> int:
        return len(obs)

futures = [embed.remote(["a", "b"]) for _ in range(4)]
print(ray.get(futures))               # gather task results
policy = Policy.remote()              # long-lived actor handle
print(ray.get(policy.act.remote("hello")))

What num_gpus actually drives is Ray's resource-accounting scheduler: each node advertises a GPU capacity, a request reserves a (possibly fractional) slice, tasks that do not fit stay PENDING, and one task is never split across hosts. The scheduler is a greedy pack over free capacity. This numpy model reproduces that admission logic and asserts it, including fractional packing, rejection of an over-sized request, and equivalence to a slow brute-force reference over 2000 random instances:

# ray_resources.py -- numpy model of Ray's fractional-GPU resource accounting and
# greedy bin-pack admission (the num_gpus request model). Executed and asserted:
# fractional packing, over-subscription rejection, and a slow-reference equivalence.
import numpy as np

def schedule(free_gpu, reqs):                       # free_gpu[i]=GPU left on node i; reqs=list of num_gpus
    free = free_gpu.astype(np.float64).copy()
    place = np.full(len(reqs), -1, np.int64)        # -1 = unschedulable (task stays PENDING)
    for k, need in enumerate(reqs):
        fits = np.flatnonzero(free + 1e-9 >= need)  # first node with capacity (Ray packs by default)
        if fits.size:
            n = fits[0]; free[n] -= need; place[k] = n
    return place, free

# 2 nodes x 1 GPU each. Four 0.5-GPU actors pack two-per-GPU; a 5th 0.5 has no room.
free0 = np.array([1.0, 1.0])
place, free = schedule(free0, [0.5, 0.5, 0.5, 0.5, 0.5])
assert (place == np.array([0, 0, 1, 1, -1])).all()          # fractional packing, exact placement
assert np.isclose(free, 0.0).all()                          # both GPUs fully booked
assert (place[:4] >= 0).all() and place[4] == -1            # 5th is unschedulable, not silently dropped

# conservation: GPU consumed by placed actors exactly equals free-capacity drop,
# and never exceeds total capacity (no over-subscription).
placed = place >= 0
demand_used = np.full(5, 0.5)[placed].sum()
assert np.isclose(demand_used, free0.sum() - free.sum())     # bookkeeping is exact
assert demand_used <= free0.sum() + 1e-9                      # capacity is never exceeded

# adversarial 1: a request larger than any single node's free GPU must be rejected,
# NOT split across nodes (Ray never fragments one task across hosts).
p2, _ = schedule(np.array([1.0, 1.0]), [1.5])
assert p2[0] == -1, "a 1.5-GPU task on 1-GPU nodes must stay pending, never split"

# adversarial 2: equivalence to a slow brute-force reference on random instances.
def slow_ref(free_gpu, reqs):                        # explicit loop, no numpy vectorization
    free = list(map(float, free_gpu)); out = []
    for need in reqs:
        chosen = -1
        for i in range(len(free)):
            if free[i] + 1e-9 >= need:
                chosen = i; free[i] -= need; break
        out.append(chosen)
    return out
rng = np.random.default_rng(7)
for _ in range(2000):
    fg = rng.integers(0, 5, size=rng.integers(1, 6)).astype(float)
    rq = (rng.integers(1, 9, size=rng.integers(0, 12)) / 4.0).tolist()  # 0.25..2.0 GPUs
    fast, _ = schedule(fg, rq)
    assert fast.tolist() == slow_ref(fg, rq), (fg, rq)
print("resources OK: fractional pack + reject over-request + 2000-case slow-ref match")

How to develop with it

Ray Train wraps a PyTorch loop in a TorchTrainer; ScalingConfig sets worker count and GPU use; Ray sets up the distributed (NCCL) process group per worker. Reference template (needs Ray plus PyTorch):

from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

def train_loop_per_worker(config: dict) -> None:
    import ray.train.torch
    model = build_model()
    model = ray.train.torch.prepare_model(model)  # wraps in DDP/device move
    for batch in get_loader():
        loss = step(model, batch)
    ray.train.report({"loss": float(loss)})

trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    scaling_config=ScalingConfig(num_workers=8, use_gpu=True),
)
trainer.fit()

How to scale it

Ray scales by adding worker nodes (manual or via the autoscaler) and by pinning groups of work to nodes with placement groups.

  • Autoscaling: on Kubernetes, a RayCluster with enableInTreeAutoscaling grows/shrinks worker pods on demand; the autoscaler reads pending tasks/actors.
  • Multi-node: Ray Train spans nodes transparently; one worker per GPU, NCCL across nodes (needs RDMA, see below).
  • Placement groups reserve bundles of resources with a strategy: STRICT_PACK co-locates a tightly-coupled group on one node (intra-node NVLink); STRICT_SPREAD forces one-per-node for a NCCL group spread across nodes.

Reference template (needs a running Ray):

from ray.util.placement_group import placement_group

# 8 GPUs packed onto one node (exploit NVLink for an in-node collective)
pg = placement_group(bundles=[{"GPU": 1, "CPU": 4}] * 8, strategy="STRICT_PACK")
ray.get(pg.ready())

The two strategies encode different feasibility constraints, and getting them backwards is a real failure (below): STRICT_PACK needs one node large enough for the whole group, STRICT_SPREAD needs one distinct node per bundle. This numpy model asserts both, including the boundary where demand exactly equals a node's capacity and the case where a topology that satisfies one strategy provably fails the other. strict_spread_ok is a greedy sort-and-match; the block also asserts it equals an exhaustive one-bundle-per-node search over 3000 random instances:

# placement_group.py -- numpy model of STRICT_PACK vs STRICT_SPREAD feasibility.
# STRICT_PACK: all bundles must fit on ONE node (intra-node NVLink group).
# STRICT_SPREAD: at most one bundle per node (one-rank-per-node NCCL group).
# Executed and asserted, including the boundary case where each strategy flips.
import numpy as np

def strict_pack_ok(node_free_gpu, bundle_gpu):      # every bundle co-located on a single node
    total = float(np.sum(bundle_gpu))
    return bool(np.any(node_free_gpu + 1e-9 >= total))   # some node holds the whole group

def strict_spread_ok(node_free_gpu, bundle_gpu):    # one bundle per distinct node
    b = np.sort(bundle_gpu)[::-1]                    # largest bundles first
    free = np.sort(node_free_gpu)[::-1].astype(float).copy()
    if b.size > free.size:                           # cannot place more bundles than nodes
        return False
    for i, need in enumerate(b):                     # greedy: i-th largest bundle -> i-th freest node
        if free[i] + 1e-9 < need:
            return False
    return True

# 8-GPU NVLink group as 8 x 1-GPU bundles. PACK needs one 8-GPU node.
bundles = np.ones(8)
assert strict_pack_ok(np.array([8.0, 2.0]), bundles) is True     # the 8-GPU node holds it
assert strict_pack_ok(np.array([4.0, 4.0]), bundles) is False    # 4+4 split cannot satisfy STRICT_PACK
# SPREAD of the same 8 bundles needs 8 distinct nodes with >=1 GPU each.
assert strict_spread_ok(np.ones(8), bundles) is True             # 8 nodes x 1 GPU: one bundle each
assert strict_spread_ok(np.ones(7), bundles) is False            # only 7 nodes: 8th bundle unplaceable

# boundary: total demand exactly equals a node's capacity -> PACK just fits.
assert strict_pack_ok(np.array([8.0]), np.ones(8)) is True       # 8 == 8, packs exactly
assert strict_pack_ok(np.array([7.99]), np.ones(8)) is False     # one GPU short -> infeasible

# adversarial: PACK and SPREAD are genuinely different constraints, not interchangeable.
# 8 GPUs on a single node satisfies PACK but FAILS SPREAD (only one node available).
one_fat_node = np.array([8.0])
assert strict_pack_ok(one_fat_node, bundles) is True
assert strict_spread_ok(one_fat_node, bundles) is False
# 8 single-GPU nodes satisfies SPREAD but FAILS PACK (no node holds all 8).
eight_thin = np.ones(8)
assert strict_spread_ok(eight_thin, bundles) is True
assert strict_pack_ok(eight_thin, bundles) is False

# adversarial 2: heterogeneous bundles -- SPREAD must match the biggest bundle to a big-enough node.
hetero = np.array([4.0, 1.0, 1.0])                  # one 4-GPU bundle + two 1-GPU
assert strict_spread_ok(np.array([4.0, 1.0, 1.0]), hetero) is True   # 4->4node, 1->1, 1->1
assert strict_spread_ok(np.array([2.0, 2.0, 2.0]), hetero) is False  # no node fits the 4-GPU bundle

# adversarial 3: the sort-and-match greedy equals an exhaustive one-bundle-per-node search.
from itertools import permutations
def spread_bruteforce(node_free, bundle):           # true iff SOME injective bundle->node assignment fits
    if len(bundle) > len(node_free):
        return False
    for perm in permutations(range(len(node_free)), len(bundle)):
        if all(node_free[perm[k]] + 1e-9 >= bundle[k] for k in range(len(bundle))):
            return True
    return False
rng = np.random.default_rng(11)
for _ in range(3000):
    nf = rng.integers(0, 5, size=rng.integers(1, 6)).astype(float)
    bd = rng.integers(1, 5, size=rng.integers(1, 5)).astype(float)
    assert strict_spread_ok(nf, bd) == spread_bruteforce(nf, bd), (nf, bd)
print("placement groups OK: PACK vs SPREAD feasibility, boundary, non-interchangeability, 3000-case brute-force match")

How to serve inference with it

Ray Serve is the serving library (inference serving). Ray Serve LLM builds an OpenAI-compatible app backed by vLLM, with autoscaling and multi-model routing; most engine_kwargs that work with vllm serve pass through. Reference template (needs Ray Serve plus vLLM):

from ray.serve.llm import LLMConfig, build_openai_app

llm = LLMConfig(
    model_loading_config={"model_id": "qwen3-8b",
                          "model_source": "Qwen/Qwen3-8B"},
    engine_kwargs={"tensor_parallel_size": 2, "max_model_len": 32768},
)
app = build_openai_app({"llm_configs": [llm]})  # serve.run(app) to deploy

How to integrate with it

Ray is the integration substrate three ways: as a controller under RL post-training libraries, as a workload on a cluster manager, and as a fine-tuning runner.

  • As the controller for RL libraries. verl, slime, SkyRL, NeMo-RL, and OpenRLHF all use Ray actors to hold the vLLM/SGLang rollout engine and the FSDP/Megatron trainer as separate, coordinated actor groups, shuttling weights between them (RL libraries, fine-tuning and post-training, GRPO, DPO). The delta-sync path between those actor groups is its own topic (delta weight sync).
  • As a workload on a cluster manager. Ray runs ON Kubernetes via KubeRay (the RayCluster / RayJob / RayService CRDs, Kubernetes) or ON Slurm via the Ray-on-Slurm launch pattern (Slurm), so the cluster scheduler and GPU Operator keep governing quota and topology. Prefer KubeRay to a hand-rolled second stack (orchestration overview).
  • As a fine-tuning runner. Ray Train runs the SFT/LoRA loop (SFT and LoRA) directly as a distributed TorchTrainer (the develop-section template above), the non-RL path to the same GPUs.

How to run it on optimised hardware

  • RDMA into workers: expose IB/RoCE NICs to Ray worker pods/nodes so NCCL uses GPUDirect RDMA, not TCP. On KubeRay, request the RDMA resource in the worker container (e.g. rdma/rdma_shared_device_a), exactly as for any pod (Kubernetes for GPUs).
  • NCCL: Ray Train collectives run over NCCL. Set NCCL_IB_HCA, NCCL_NET_GDR_LEVEL=SYS, optionally NCCL_NVLS_ENABLE for NVLink SHARP; confirm [GDRDMA] in NCCL_DEBUG=INFO (performance tuning). ACS must be off for P2P/GDR.
  • Placement groups for topology: STRICT_PACK to keep a collective inside one node's NVLink/NVSwitch domain; STRICT_SPREAD for one-rank-per-node. Bundles are not ordered by physical GPU rank, so pin device/affinity inside the worker, not by bundle index (the Blackwell platform).

How to run it in production

On Kubernetes, run Ray through the KubeRay CRDs so the cluster scheduler and GPU Operator keep governing it, and pick the CRD by lifecycle:

  • RayJob for run-to-completion training (the cookbook recipe below): shutdownAfterJobFinishes: true tears the cluster down when the job exits, so a finished run frees its GPUs.
  • RayService for online serving: it adds high availability and zero-downtime rolling upgrades in front of a Ray Serve app, which a bare RayCluster does not.
  • GCS fault tolerance: the head node's GCS is a single point of failure, so back it with external Redis; then a head restart recovers cluster state instead of dropping every actor. RayService wires this in for serving.
  • Pin the image and expose RDMA at the pod: pin an explicit rayproject/ray:<ver>-gpu tag on head and workers (mismatched Ray versions across nodes fail to form a cluster), and request the RDMA resource in the worker container so NCCL gets GPUDirect RDMA rather than TCP.

How to maintain it

  • Pin the version line. As of mid-2026 the stable line is Ray 2.55.x with a 3.0 dev line in master; verify the exact tag on docs.ray.io and keep head, workers, and the KubeRay operator in lockstep. For RL, treat the Ray build and the inference-engine build as one versioned unit.
  • Size the object store and watch spill. Large objects evicted from the in-memory object store to disk stall pipelines; set object_store_memory deliberately and alert on spill metrics rather than discovering spill through a throughput cliff.
  • Watch the head and GCS. Monitor head-node liveness and the GCS-backing Redis; a head loss without GCS fault tolerance drops the cluster.
  • Re-verify the fabric after changes. After any node, image, or NCCL-env change, re-confirm [GDRDMA] in NCCL_DEBUG=INFO from inside a worker and re-run nccl-tests, because a silent TCP fallback craters throughput without erroring (workload recipes).

Cookbook (common use cases)

Reference templates (each needs a running Ray plus the noted library); the resource math they rely on is the validated numpy above.

1. Ray Data batch GPU inference (offline scoring across workers):

import ray

class Scorer:                                   # actor-style stateful UDF
    def __init__(self) -> None: self.model = load_model()  # once per worker
    def __call__(self, batch: dict) -> dict:
        batch["score"] = self.model(batch["text"]); return batch

ds = ray.data.read_parquet("s3://bucket/prompts/")
ds.map_batches(Scorer, concurrency=8, num_gpus=1).write_parquet("s3://bucket/out/")

2. Ray Serve LLM deployment (run the build_openai_app result from the serve-inference section):

from ray import serve
serve.run(app, blocking=True)  # OpenAI-compatible API on :8000/v1

3. KubeRay RayJob (run-to-completion on Kubernetes Kubernetes):

apiVersion: ray.io/v1
kind: RayJob
metadata: { name: rl-train, namespace: ml }
spec:
  entrypoint: python train_rl.py
  shutdownAfterJobFinishes: true
  rayClusterSpec:
    headGroupSpec:
      template:
        spec: { containers: [{ name: ray-head, image: rayproject/ray:2.55.1-py311-gpu }] }
    workerGroupSpecs:
      - groupName: gpu-workers
        replicas: 8
        template:
          spec:
            containers:
              - name: ray-worker
                image: rayproject/ray:2.55.1-py311-gpu
                resources:
                  limits: { nvidia.com/gpu: 8, rdma/rdma_shared_device_a: 1 }

Failure modes

  • Workers without RDMA → Ray Train NCCL silently falls back to TCP; throughput craters. Validate with nccl-tests inside a worker (workload recipes).
  • Ungoverned second stack: running Ray beside Kubernetes without KubeRay duplicates scheduling and bypasses quota; prefer KubeRay so the cluster scheduler and GPU Operator govern Ray pods (orchestration overview).
  • GCS as a single point: a head-node loss drops the cluster unless GCS fault tolerance (external Redis) is configured; RayService adds HA for serving.
  • Bundle ≠ physical GPU order: never assume bundle index maps to GPU rank for NVLink locality; set device affinity in the worker.
  • Object store spill: large objects evicted to disk stall pipelines; size object_store_memory and watch spill metrics.

References

  • Ray docs: https://docs.ray.io/en/latest/
  • KubeRay: https://docs.ray.io/en/latest/cluster/kubernetes/index.html · repo https://github.com/ray-project/kuberay
  • Ray Train (PyTorch): https://docs.ray.io/en/latest/train/getting-started-pytorch.html · TorchTrainer https://docs.ray.io/en/latest/train/api/doc/ray.train.torch.TorchTrainer.html
  • Ray Serve LLM: https://docs.ray.io/en/latest/serve/index.html · build_openai_app https://docs.ray.io/en/latest/serve/api/doc/ray.serve.llm.build_openai_app.html
  • Placement groups: https://docs.ray.io/en/latest/ray-core/scheduling/placement-group.html
  • Ray on Slurm: https://docs.ray.io/en/latest/cluster/vms/user-guides/community/slurm.html

Related: Inference · Orchestration · RL Libraries · Kubernetes · Slurm · SFT/LoRA · Glossary