skills/skillxiv-v0.0.2-claude-opus-4.6/areal-async-rl-language-reasoning/SKILL.md
Scale RL training to large models through decoupled rollout and training workers with controlled data staleness.
npx skillsauth add ADu2021/skillXiv areal-async-rl-language-reasoningInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Standard RL systems for language models synchronize all workers: generate N rollouts, wait for slowest to finish, train on batch, repeat. The slowest rollout blocks everything. AReaL breaks this bottleneck by running rollout and training asynchronously: generation workers produce data continuously while training workers process it in parallel, without waiting. A modified PPO algorithm handles data staleness—the gap between when data was generated and when it's trained on. The result: 2.77× speedup on the same hardware with maintained or improved final performance.
This enables large-scale RL training on language reasoning with GPUs that would otherwise spend 30-40% idle time waiting for synchronization.
Synchronous RL has serial bottlenecks: you can't train until all rollouts finish, you can't generate until training finishes. AReaL inverts this: maintain a buffer of generated trajectories and continuously train on them, accepting that training data is slightly stale (generated by an older model checkpoint). The key innovation is a "Staleness-Enhanced PPO" variant that adjusts importance weights based on how old the data is, preventing divergence from stale training signals.
This implementation demonstrates async RL with staleness-aware training.
Build the asynchronous trajectory buffer:
import queue
import threading
import time
from typing import List, Dict, Tuple
from dataclasses import dataclass, field
from collections import deque
@dataclass
class Trajectory:
"""Single rollout trajectory."""
states: List
actions: List
rewards: List
log_probs: List
values: List
generated_at_step: int # Which training step created this
generation_time: float
@dataclass
class TrainingBatch:
"""Batch of trajectories for training."""
trajectories: List[Trajectory]
staleness: int # How many training steps have passed since generation
class AsynchronousReplayBuffer:
"""Thread-safe buffer for continuous rollout/training decoupling."""
def __init__(self, max_size: int = 10000):
self.buffer = deque(maxlen=max_size)
self.lock = threading.Lock()
self.current_training_step = 0
def add_trajectory(self, trajectory: Trajectory):
"""Add generated trajectory (from rollout worker)."""
with self.lock:
self.buffer.append(trajectory)
def sample_batch(self, batch_size: int) -> Tuple[TrainingBatch, float]:
"""
Sample batch for training (from training worker).
Returns (batch, average_staleness).
"""
with self.lock:
if len(self.buffer) < batch_size:
return None, 0.0
# Sample without replacement
sampled = list(self.buffer)[:batch_size]
# Remove sampled trajectories
for _ in range(min(batch_size, len(self.buffer))):
self.buffer.popleft()
# Compute staleness for sampled trajectories
staleness_list = [
max(0, self.current_training_step - traj.generated_at_step)
for traj in sampled
]
avg_staleness = sum(staleness_list) / len(staleness_list) if staleness_list else 0
batch = TrainingBatch(
trajectories=sampled,
staleness=int(avg_staleness)
)
return batch, avg_staleness
def update_training_step(self, step: int):
"""Notify buffer of training progress."""
with self.lock:
self.current_training_step = step
def buffer_size(self) -> int:
"""Current buffer occupancy."""
with self.lock:
return len(self.buffer)
# Example usage
buffer = AsynchronousReplayBuffer(max_size=5000)
# Simulate rollout worker adding trajectories
for i in range(100):
traj = Trajectory(
states=[1, 2, 3],
actions=[0, 1, 0],
rewards=[1.0, 0.5, 1.0],
log_probs=[-0.5, -0.3, -0.4],
values=[0.8, 0.6, 0.9],
generated_at_step=i,
generation_time=time.time()
)
buffer.add_trajectory(traj)
# Sample batches
batch, staleness = buffer.sample_batch(batch_size=32)
print(f"Batch size: {len(batch.trajectories)}, Average staleness: {staleness} steps")
Implement Staleness-Enhanced PPO:
import torch
import torch.nn.functional as F
import torch.optim as optim
class StalenessEnhancedPPO:
"""PPO variant that handles stale data from async training."""
def __init__(self, model, learning_rate: float = 1e-4,
clip_ratio: float = 0.2, entropy_coef: float = 0.01):
self.model = model
self.optimizer = optim.Adam(model.parameters(), lr=learning_rate)
self.clip_ratio = clip_ratio
self.entropy_coef = entropy_coef
def compute_staleness_factor(self, staleness: int,
max_staleness: int = 100) -> float:
"""
Adjust PPO clipping based on staleness.
Recent data (staleness=0): normal clipping
Stale data: tighter clipping to prevent divergence
"""
if staleness == 0:
return 1.0
# Exponential decay: fresher data = wider clip range
decay = 0.95 ** staleness
return max(0.1, decay) # Minimum 0.1x normal clipping
def compute_policy_loss(self, batch: TrainingBatch,
old_log_probs: torch.Tensor,
advantages: torch.Tensor) -> torch.Tensor:
"""
PPO loss with staleness adjustment.
"""
# Forward pass to get new log probs
states = torch.tensor([t.states[0] for t in batch.trajectories])
new_log_probs = self.model(states) # Placeholder
# Probability ratio
ratio = torch.exp(new_log_probs - old_log_probs)
# Staleness-adjusted clipping
staleness_factor = self.compute_staleness_factor(batch.staleness)
adjusted_clip = self.clip_ratio * staleness_factor
# PPO clipped objective
clipped_ratio = torch.clamp(ratio, 1 - adjusted_clip, 1 + adjusted_clip)
policy_loss = -torch.min(
ratio * advantages,
clipped_ratio * advantages
).mean()
return policy_loss
def train_step(self, batch: TrainingBatch) -> dict:
"""Single training step on stale batch."""
# Convert batch to tensors
states = torch.tensor([t.states for t in batch.trajectories])
actions = torch.tensor([t.actions for t in batch.trajectories])
rewards = torch.tensor([t.rewards for t in batch.trajectories])
old_log_probs = torch.tensor([t.log_probs for t in batch.trajectories])
old_values = torch.tensor([t.values for t in batch.trajectories])
# Compute advantages (simplified)
returns = rewards.clone()
advantages = returns - old_values
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# Policy loss
policy_loss = self.compute_policy_loss(batch, old_log_probs, advantages)
# Value loss
value_loss = F.mse_loss(old_values, returns)
# Entropy bonus
entropy_loss = 0.0 # Placeholder
# Total loss
total_loss = policy_loss + value_loss - self.entropy_coef * entropy_loss
# Backward pass
self.optimizer.zero_grad()
total_loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 0.5)
self.optimizer.step()
return {
"policy_loss": policy_loss.item(),
"value_loss": value_loss.item(),
"total_loss": total_loss.item(),
"staleness": batch.staleness
}
# Simple test model
class SimplePolicy(torch.nn.Module):
def __init__(self):
super().__init__()
self.fc = torch.nn.Linear(3, 2)
def forward(self, x):
return self.fc(x)
model = SimplePolicy()
ppo = StalenessEnhancedPPO(model)
# Training on stale batch
batch, staleness = buffer.sample_batch(batch_size=32)
if batch:
stats = ppo.train_step(batch)
print(f"Training stats: {stats}")
Build the async coordination system:
import numpy as np
class AsyncRLSystem:
"""Orchestrate rollout and training workers with async communication."""
def __init__(self, model, num_rollout_workers: int = 4,
num_training_workers: int = 2):
self.model = model
self.buffer = AsynchronousReplayBuffer(max_size=5000)
self.ppo = StalenessEnhancedPPO(model)
self.num_rollout_workers = num_rollout_workers
self.num_training_workers = num_training_workers
self.global_step = 0
self.stop_event = threading.Event()
def rollout_worker(self, worker_id: int):
"""
Worker thread: continuously generate rollouts.
Doesn't wait for training; just fills buffer.
"""
print(f"Rollout worker {worker_id} started")
while not self.stop_event.is_set():
# Generate trajectory (simplified)
traj = Trajectory(
states=[np.random.randn(3) for _ in range(5)],
actions=[np.random.randint(0, 2) for _ in range(5)],
rewards=[np.random.rand() for _ in range(5)],
log_probs=[np.random.randn() for _ in range(5)],
values=[np.random.rand() for _ in range(5)],
generated_at_step=self.global_step,
generation_time=time.time()
)
# Add to buffer (non-blocking)
self.buffer.add_trajectory(traj)
time.sleep(0.01) # Simulate rollout latency
def training_worker(self, worker_id: int):
"""
Worker thread: continuously train on available data.
Doesn't wait for specific rollouts; processes whatever is in buffer.
"""
print(f"Training worker {worker_id} started")
while not self.stop_event.is_set():
# Try to sample batch
batch, staleness = self.buffer.sample_batch(batch_size=32)
if batch is not None:
# Train
stats = self.ppo.train_step(batch)
self.global_step += 1
self.buffer.update_training_step(self.global_step)
if self.global_step % 100 == 0:
print(f"Step {self.global_step}: "
f"Loss={stats['total_loss']:.4f}, "
f"Buffer={self.buffer.buffer_size()}, "
f"Staleness={staleness}")
else:
# No data yet, wait
time.sleep(0.1)
def run(self, num_steps: int = 1000):
"""Run async RL system."""
# Start workers
rollout_threads = [
threading.Thread(target=self.rollout_worker, args=(i,))
for i in range(self.num_rollout_workers)
]
training_threads = [
threading.Thread(target=self.training_worker, args=(i,))
for i in range(self.num_training_workers)
]
for t in rollout_threads + training_threads:
t.daemon = True
t.start()
# Run for specified steps
start_time = time.time()
while self.global_step < num_steps and (time.time() - start_time) < 60:
time.sleep(1)
# Cleanup
self.stop_event.set()
for t in rollout_threads + training_threads:
t.join(timeout=5)
print(f"Async RL completed: {self.global_step} steps in "
f"{time.time() - start_time:.1f}s")
# Run async system
system = AsyncRLSystem(model, num_rollout_workers=4, num_training_workers=2)
system.run(num_steps=200)
| Aspect | Details | |--------|---------| | Rollout/Training Ratio | Aim for 4:2 (4 rollout, 2 training workers); adjust based on hardware | | Buffer Size | 2000-10000 trajectories typical; larger buffer handles more staleness | | Max Staleness | 50-100 training steps; beyond this, data diverges too far from current policy | | Worker Placement | Spread workers across GPUs; use CPU for rollout if available | | Monitoring | Track buffer occupancy; if consistently full or empty, rebalance worker counts |
When to Use:
When NOT to Use:
Common Pitfalls:
AReaL: A Large-Scale Asynchronous Reinforcement Learning System for Language Reasoning https://arxiv.org/abs/2505.24298
testing
Uses flow maps as look-ahead operators to enable principled reward-guided diffusion by predicting trajectory endpoints at any denoising step. Deploy when applying rewards or preferences to diffusion trajectories with meaningful gradients throughout generation.
testing
Train language models where each expert learns independently on closed datasets, enabling flexible inference with selective data inclusion or exclusion. 41% performance improvement while allowing users to opt out of specific data sources without retraining.
data-ai
Understand how token generation flexibility in diffusion LMs paradoxically constrains reasoning, as models exploit ordering flexibility to avoid uncertain tokens, and apply simplified approaches that preserve parallel decoding benefits. Use when optimizing diffusion-based language models for reasoning tasks.
devops
Enable LLM agents to improve continuously during deployment by constructing structured experience libraries through self-reflection on successes and failures—achieving 23% improvement on reasoning without gradient-based parameter updates or external training.