cli-tool/components/skills/ai-research/data-processing-ray-data/SKILL.md
--- name: ray-data description: Scalable data processing for ML workloads. Streaming execution across CPU/GPU, supports Parquet/CSV/JSON/images. Integrates with Ray Train, PyTorch, TensorFlow. Scales from single machine to 100s of nodes. Use for batch inference, data preprocessing, multi-modal data loading, or distributed ETL pipelines. version: 1.0.0 author: Orchestra Research license: MIT tags: [Data Processing, Ray Data, Distributed Computing, ML Pipelines, Batch Inference, ETL, Scalable, Ray
npx skillsauth add davila7/claude-code-templates cli-tool/components/skills/ai-research/data-processing-ray-dataInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Distributed data processing library for ML and AI workloads.
Use Ray Data when:
Key features:
Use alternatives instead:
pip install -U 'ray[data]'
import ray
# Read Parquet files
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
# Transform data (lazy execution)
ds = ds.map_batches(lambda batch: {"processed": batch["text"].str.lower()})
# Consume data
for batch in ds.iter_batches(batch_size=100):
print(batch)
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
# Create dataset
train_ds = ray.data.read_parquet("s3://bucket/train/*.parquet")
def train_func(config):
# Access dataset in training
train_ds = ray.train.get_dataset_shard("train")
for epoch in range(10):
for batch in train_ds.iter_batches(batch_size=32):
# Train on batch
pass
# Train with Ray
trainer = TorchTrainer(
train_func,
datasets={"train": train_ds},
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)
trainer.fit()
import ray
# Parquet (recommended for ML)
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
# CSV
ds = ray.data.read_csv("s3://bucket/data/*.csv")
# JSON
ds = ray.data.read_json("gs://bucket/data/*.json")
# Images
ds = ray.data.read_images("s3://bucket/images/")
# From list
ds = ray.data.from_items([{"id": i, "value": i * 2} for i in range(1000)])
# From range
ds = ray.data.range(1000000) # Synthetic data
# From pandas
import pandas as pd
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
ds = ray.data.from_pandas(df)
# Batch transformation (fast)
def process_batch(batch):
batch["doubled"] = batch["value"] * 2
return batch
ds = ds.map_batches(process_batch, batch_size=1000)
# Row-by-row (slower)
def process_row(row):
row["squared"] = row["value"] ** 2
return row
ds = ds.map(process_row)
# Filter rows
ds = ds.filter(lambda row: row["value"] > 100)
# Group by column
ds = ds.groupby("category").count()
# Custom aggregation
ds = ds.groupby("category").map_groups(lambda group: {"sum": group["value"].sum()})
# Use GPU for preprocessing
def preprocess_images_gpu(batch):
import torch
images = torch.tensor(batch["image"]).cuda()
# GPU preprocessing
processed = images * 255
return {"processed": processed.cpu().numpy()}
ds = ds.map_batches(
preprocess_images_gpu,
batch_size=64,
num_gpus=1 # Request GPU
)
# Write to Parquet
ds.write_parquet("s3://bucket/output/")
# Write to CSV
ds.write_csv("output/")
# Write to JSON
ds.write_json("output/")
# Control parallelism
ds = ds.repartition(100) # 100 blocks for 100-core cluster
# Larger batches = faster vectorized ops
ds.map_batches(process_fn, batch_size=10000) # vs batch_size=100
# Process data larger than memory
ds = ray.data.read_parquet("s3://huge-dataset/")
for batch in ds.iter_batches(batch_size=1000):
process(batch) # Streamed, not loaded to memory
import ray
# Load model
def load_model():
# Load once per worker
return MyModel()
# Inference function
class BatchInference:
def __init__(self):
self.model = load_model()
def __call__(self, batch):
predictions = self.model(batch["input"])
return {"prediction": predictions}
# Run distributed inference
ds = ray.data.read_parquet("s3://data/")
predictions = ds.map_batches(BatchInference, batch_size=32, num_gpus=1)
predictions.write_parquet("s3://output/")
# Multi-step pipeline
ds = (
ray.data.read_parquet("s3://raw/")
.map_batches(clean_data)
.map_batches(tokenize)
.map_batches(augment)
.write_parquet("s3://processed/")
)
# Convert to PyTorch
torch_ds = ds.to_torch(label_column="label", batch_size=32)
for batch in torch_ds:
# batch is dict with tensors
inputs, labels = batch["features"], batch["label"]
# Convert to TensorFlow
tf_ds = ds.to_tf(feature_columns=["image"], label_column="label", batch_size=32)
for features, labels in tf_ds:
# Train model
pass
| Format | Read | Write | Use Case | |--------|------|-------|----------| | Parquet | ✅ | ✅ | ML data (recommended) | | CSV | ✅ | ✅ | Tabular data | | JSON | ✅ | ✅ | Semi-structured | | Images | ✅ | ❌ | Computer vision | | NumPy | ✅ | ✅ | Arrays | | Pandas | ✅ | ❌ | DataFrames |
Scaling (processing 100GB data):
GPU acceleration (image preprocessing):
Production deployments:
tools
No-code automation democratizes workflow building. Zapier and Make (formerly Integromat) let non-developers automate business processes without writing code. But no-code doesn't mean no-complexity - these platforms have their own patterns, pitfalls, and breaking points. This skill covers when to use which platform, how to build reliable automations, and when to graduate to code-based solutions. Key insight: Zapier optimizes for simplicity and integrations (7000+ apps), Make optimizes for power
tools
Use only when the user explicitly asks to stage, commit, push, and open a GitHub pull request in one flow using the GitHub CLI (`gh`).
tools
Workflow automation is the infrastructure that makes AI agents reliable. Without durable execution, a network hiccup during a 10-step payment flow means lost money and angry customers. With it, workflows resume exactly where they left off. This skill covers the platforms (n8n, Temporal, Inngest) and patterns (sequential, parallel, orchestrator-worker) that turn brittle scripts into production-grade automation. Key insight: The platforms make different tradeoffs. n8n optimizes for accessibility
development
Trigger.dev expert for background jobs, AI workflows, and reliable async execution with excellent developer experience and TypeScript-first design. Use when: trigger.dev, trigger dev, background task, ai background job, long running task.