.github/skills/ray-distributed-computing/SKILL.md
Comprehensive guide to using Ray for scalable distributed computing, including Ray Core, Data, Train, Tune, Serve, and RLlib with practical examples
npx skillsauth add anhvth/speedy_utils .github/skills/ray-distributed-computingInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Ray is an open-source unified framework for scaling AI and Python applications. It provides the compute layer for parallel processing so that you don't need to be a distributed systems expert. Ray minimizes the complexity of running distributed individual workflows and end-to-end machine learning workflows.
pip install -U ray
# Ray Data for data processing
pip install -U "ray[data]"
# Ray Train for distributed training
pip install -U "ray[train]"
# Ray Tune for hyperparameter tuning
pip install -U "ray[tune]"
# Ray Serve for model serving
pip install -U "ray[serve]"
# RLlib for reinforcement learning
pip install -U "ray[rllib]" torch
# All libraries
pip install -U "ray[all]"
Ray's unified compute framework consists of three layers:
import ray
# Initialize Ray
ray.init()
# Or Ray auto-initializes on first remote API call
Tasks are stateless functions that execute in parallel:
import ray
# Define a remote task
@ray.remote
def square(x):
return x * x
# Launch four parallel square tasks
futures = [square.remote(i) for i in range(4)]
# Retrieve results
results = ray.get(futures)
print(results) # [0, 1, 4, 9]
Actors maintain internal state between method calls:
import ray
# Define a Counter actor
@ray.remote
class Counter:
def __init__(self):
self.i = 0
def get(self):
return self.i
def incr(self, value):
self.i += value
# Create a Counter actor
c = Counter.remote()
# Submit calls to the actor
for _ in range(10):
c.incr.remote(1)
# Retrieve final actor state
print(ray.get(c.get.remote())) # 10
Ray's distributed object store efficiently manages data:
import ray
import numpy as np
# Define a task that sums the values in a matrix
@ray.remote
def sum_matrix(matrix):
return np.sum(matrix)
# Call with a literal argument
result = ray.get(sum_matrix.remote(np.ones((100, 100))))
print(result) # 10000.0
# Put a large array into the object store
matrix_ref = ray.put(np.ones((1000, 1000)))
# Call with the object reference
result = ray.get(sum_matrix.remote(matrix_ref))
print(result) # 1000000.0
Ray Data provides flexible and performant APIs for batch inference, data preprocessing, and data loading for ML training. It features a streaming execution engine for efficiently processing large datasets.
import ray
import pandas as pd
class ClassificationModel:
def __init__(self):
from transformers import pipeline
self.pipe = pipeline("text-classification")
def __call__(self, batch: pd.DataFrame):
results = self.pipe(list(batch["text"]))
result_df = pd.DataFrame(results)
return pd.concat([batch, result_df], axis=1)
# Read data
ds = ray.data.read_text("s3://anonymous@ray-example-data/sms_spam_collection_subset.txt")
# Apply batch transformation
ds = ds.map_batches(
ClassificationModel,
compute=ray.data.ActorPoolStrategy(size=2),
batch_size=64,
batch_format="pandas",
# num_gpus=1 # Enable for GPU workers
)
# Show results
ds.show(limit=1)
Ray Train is a scalable machine learning library for distributed training and fine-tuning. It supports PyTorch, PyTorch Lightning, HuggingFace Transformers, TensorFlow, Keras, XGBoost, LightGBM, and more.
| PyTorch Ecosystem | More Frameworks | |-------------------|-----------------| | PyTorch | TensorFlow | | PyTorch Lightning | Keras | | Hugging Face Transformers | Horovod | | Hugging Face Accelerate | XGBoost | | DeepSpeed | LightGBM |
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
def train_func(config):
# Your training logic here
import torch
import torch.nn as nn
model = nn.Linear(10, 1)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# Training loop
for epoch in range(10):
# Training code
pass
# Configure trainer
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=4,
use_gpu=True
)
)
# Run training
result = trainer.fit()
Ray Tune is a Python library for experiment execution and hyperparameter tuning at any scale. It supports PyTorch, XGBoost, TensorFlow, Keras, and state-of-the-art algorithms like Population Based Training (PBT) and HyperBand/ASHA.
from ray import tune
def objective(config):
# Objective function to minimize
score = config["a"] ** 2 + config["b"]
return {"score": score}
# Define search space
search_space = {
"a": tune.grid_search([0.001, 0.01, 0.1, 1.0]),
"b": tune.choice([1, 2, 3]),
}
# Create and run tuner
tuner = tune.Tuner(objective, param_space=search_space)
results = tuner.fit()
# Get best result
best_result = results.get_best_result(metric="score", mode="min")
print(best_result.config)
Ray Serve is a scalable model serving library for building online inference APIs. It's framework-agnostic and particularly well-suited for model composition and multi-model serving.
import requests
from starlette.requests import Request
from typing import Dict
from ray import serve
# Define a Ray Serve application
@serve.deployment
class MyModelDeployment:
def __init__(self, msg: str):
# Initialize model state
self._msg = msg
def __call__(self, request: Request) -> Dict:
return {"result": self._msg}
app = MyModelDeployment.bind(msg="Hello world!")
# Deploy the application locally
serve.run(app, route_prefix="/")
# Query the application
response = requests.get("http://localhost:8000/")
print(response.json()) # {'result': 'Hello world!'}
import requests
import starlette
from typing import Dict
from ray import serve
from ray.serve.handle import DeploymentHandle
@serve.deployment
class Adder:
def __init__(self, increment: int):
self.increment = increment
def add(self, inp: int):
return self.increment + inp
@serve.deployment
class Combiner:
def average(self, *inputs) -> float:
return sum(inputs) / len(inputs)
@serve.deployment
class Ingress:
def __init__(
self,
adder1: DeploymentHandle,
adder2: DeploymentHandle,
combiner: DeploymentHandle,
):
self._adder1 = adder1
self._adder2 = adder2
self._combiner = combiner
async def __call__(self, request: starlette.requests.Request) -> Dict[str, float]:
input_json = await request.json()
final_result = await self._combiner.average.remote(
self._adder1.add.remote(input_json["val"]),
self._adder2.add.remote(input_json["val"]),
)
return {"result": final_result}
# Build and deploy the application
app = Ingress.bind(
Adder.bind(increment=1),
Adder.bind(increment=2),
Combiner.bind()
)
serve.run(app)
# Query the application
response = requests.post("http://localhost:8000/", json={"val": 100.0})
print(response.json()) # {"result": 101.5}
RLlib is an open-source library for reinforcement learning, offering support for production-level, highly scalable, and fault-tolerant RL workloads with simple and unified APIs.
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.connectors.env_to_module import FlattenObservations
from pprint import pprint
# Configure the algorithm
config = (
PPOConfig()
.environment("Taxi-v3")
.env_runners(
num_env_runners=2,
# Observations are discrete (ints) -> flatten (one-hot) them
env_to_module_connector=lambda env: FlattenObservations(),
)
.evaluation(evaluation_num_env_runners=1)
)
# Build the algorithm
algo = config.build_algo()
# Train for 5 iterations
for _ in range(5):
pprint(algo.train())
# Evaluate
pprint(algo.evaluate())
# Release resources
algo.stop()
Ray supports deployment on:
Process large batches of data through ML models efficiently:
import ray
@ray.remote
class ModelInference:
def __init__(self):
# Load model
self.model = load_model()
def predict(self, batch):
return self.model.predict(batch)
# Create actors
actors = [ModelInference.remote() for _ in range(4)]
# Distribute work
batches = split_data_into_batches(data)
futures = [actor.predict.remote(batch) for actor, batch in zip(actors, batches)]
# Collect results
results = ray.get(futures)
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
def train_func(config):
# Distributed training logic
pass
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=8, use_gpu=True)
)
result = trainer.fit()
from ray import tune
def training_function(config):
# Training with hyperparameters from config
accuracy = train_model(lr=config["lr"], batch_size=config["batch_size"])
return {"accuracy": accuracy}
analysis = tune.run(
training_function,
config={
"lr": tune.loguniform(1e-4, 1e-1),
"batch_size": tune.choice([32, 64, 128])
},
num_samples=10
)
best_config = analysis.get_best_config(metric="accuracy", mode="max")
from ray import serve
@serve.deployment
class ModelServer:
def __init__(self):
self.model = load_model()
async def __call__(self, request):
data = await request.json()
predictions = self.model.predict(data["input"])
return {"predictions": predictions.tolist()}
serve.run(ModelServer.bind())
# Specify resources for tasks
@ray.remote(num_cpus=2, num_gpus=1)
def gpu_task(data):
return process_on_gpu(data)
# Specify resources for actors
@ray.remote(num_cpus=1, num_gpus=0.5)
class Model:
pass
# Use ray.put() for large objects
large_data = np.random.rand(1000000)
data_ref = ray.put(large_data)
# Pass reference instead of data
@ray.remote
def process(data_ref):
data = ray.get(data_ref)
return compute(data)
import ray
@ray.remote
def may_fail(x):
if x < 0:
raise ValueError("Negative input")
return x * 2
# Handle failures
try:
result = ray.get(may_fail.remote(-1))
except ray.exceptions.RayTaskError as e:
print(f"Task failed: {e}")
# Access Ray dashboard
# Default at http://localhost:8265
# Get cluster resources
resources = ray.cluster_resources()
print(resources)
# Get task/actor status
from ray import state
actors = state.list_actors()
tasks = state.list_tasks()
ray.put() and object references# Start Ray with custom resources
ray.init(resources={"special_hardware": 4})
@ray.remote(resources={"special_hardware": 1})
def specialized_task():
pass
from ray.util.placement_group import placement_group
# Create a placement group for co-located tasks
pg = placement_group([{"CPU": 2}, {"CPU": 2}], strategy="STRICT_PACK")
@ray.remote
def task():
pass
# Schedule tasks in the placement group
ray.get([task.options(placement_group=pg).remote() for _ in range(4)])
# Enable verbose logging
ray.init(logging_level="debug")
# Use ray.timeline() for performance analysis
ray.timeline(filename="timeline.json")
import ray
from ray import tune, serve
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
# 1. Data preprocessing with Ray Data
ds = ray.data.read_parquet("s3://bucket/data.parquet")
ds = ds.map_batches(preprocess_function)
# 2. Model training with Ray Train
def train_func(config):
# Training logic
pass
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)
result = trainer.fit()
# 3. Hyperparameter tuning with Ray Tune
tuner = tune.Tuner(
trainer,
param_space={"train_loop_config": {"lr": tune.loguniform(1e-4, 1e-1)}}
)
tuning_results = tuner.fit()
# 4. Model serving with Ray Serve
@serve.deployment
class PredictionService:
def __init__(self):
self.model = load_best_model(tuning_results)
async def __call__(self, request):
data = await request.json()
return {"prediction": self.model.predict(data)}
serve.run(PredictionService.bind())
Ray provides a comprehensive framework for distributed computing and AI workloads. Its unified API makes it easy to:
Start with Ray Core for general distributed computing, then leverage specialized libraries (Data, Train, Tune, Serve, RLlib) for ML-specific workloads.
documentation
Guide for using vision utilities in speedy_utils, including fast GPU image loading, memory-mapped datasets, and notebook visualization.
development
Guide for creating new Agent Skills with proper structure, frontmatter, bundled assets, and validation. Includes templates, best practices, and examples for building reusable skill resources.
development
Comprehensive guide for using multi-threading and multi-processing in Python, including when to choose each approach, best practices, and practical examples using the speedy_utils library.
tools
Reduce VS Code Python red squiggles to zero in this repository. Use when Pylance or pyright reports errors, when tools/check_syntax.py fails, or when a change should leave VS Code diagnostics clean.