skills/skillxiv-v0.0.2-claude-opus-4.6/diagonal-batching-recurrent-transformers/SKILL.md
Unlocks parallelism in recurrent memory transformers through diagonal batching of the layers-segments grid, achieving 3.3x speedup on 131K-token sequences without model retraining.
npx skillsauth add ADu2021/skillXiv diagonal-batching-recurrent-transformersInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Recurrent Memory Transformers (RMTs) achieve linear time and constant memory complexity theoretically, but their sequential execution on GPUs undermines practical performance. The core bottleneck isn't algorithmic complexity—it's scheduling constraints that prevent GPU parallelism. Diagonal batching reorganizes the computation grid (layers × segments) into independent diagonals that can execute concurrently, unlocking substantial speedups without requiring model retraining or architectural changes.
The following code demonstrates the diagonal batching algorithm:
import torch
import torch.nn as nn
from typing import List, Tuple
class DiagonalBatchingScheduler:
"""
Reorganizes RMT computation from sequential to parallel-executable diagonals.
"""
def __init__(self, num_layers: int, num_segments: int):
self.num_layers = num_layers
self.num_segments = num_segments
def compute_diagonals(self) -> List[List[Tuple[int, int]]]:
"""
Generate list of diagonals where each diagonal contains
(layer, segment) pairs that can execute in parallel.
Constraint: segment_i depends on segment_{i-1} and layer_{l} depends on layer_{l-1}
Therefore: positions with constant (segment + layer) can run in parallel.
"""
diagonals = []
# Iterate over all possible diagonal indices
for diagonal_idx in range(self.num_layers + self.num_segments - 1):
diagonal = []
# For each position (layer, segment), check if it's on this diagonal
for layer_idx in range(self.num_layers):
segment_idx = diagonal_idx - layer_idx
# Valid if segment is in valid range
if 0 <= segment_idx < self.num_segments:
diagonal.append((layer_idx, segment_idx))
if diagonal:
diagonals.append(diagonal)
return diagonals
def create_grouped_weights(self, model_layers: List[nn.Module]) -> torch.Tensor:
"""
Stack layer weights for grouped GEMM execution.
Instead of [L, H, D, D] (individual layers), produces stacked weights
suitable for batched matrix multiplication.
"""
stacked_weights = []
for layer in model_layers:
# Assume each layer has a weight matrix (e.g., attention projection)
w = layer.weight.data
stacked_weights.append(w)
# Stack along batch dimension for grouped GEMM
grouped = torch.stack(stacked_weights, dim=0)
return grouped
def grouped_gemm(self, inputs: torch.Tensor, grouped_weights: torch.Tensor,
batch_indices: List[int]) -> torch.Tensor:
"""
Execute grouped GEMM: multiple matrix multiplications in one kernel.
inputs: (B, D) or (B*num_selected_layers, D)
grouped_weights: (num_layers, D, D) or similar
batch_indices: which layers to execute
Returns: stacked outputs for all selected layers
"""
selected_weights = grouped_weights[batch_indices]
# Reshape for batched multiplication
B = inputs.shape[0] // len(batch_indices)
D = inputs.shape[-1]
# Batched matrix multiplication
# Using einsum for clarity: b (batch), l (layer), d (dimension)
inputs_reshaped = inputs.reshape(len(batch_indices), B, D)
outputs = torch.einsum('lbx,lxy->lby', inputs_reshaped, selected_weights)
return outputs.reshape(-1, D)
class DiagonalRMT(nn.Module):
"""
RMT with diagonal batching for efficient inference.
"""
def __init__(self, num_layers: int, segment_length: int, hidden_dim: int = 1024):
super().__init__()
self.num_layers = num_layers
self.segment_length = segment_length
self.hidden_dim = hidden_dim
# Build standard RMT layers
self.layers = nn.ModuleList([
nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=8, dim_feedforward=4*hidden_dim,
batch_first=True, norm_first=True)
for _ in range(num_layers)
])
self.scheduler = DiagonalBatchingScheduler(num_layers, -1) # -1 = dynamic segments
self.grouped_weights = None
def prepare_grouped_weights(self):
"""Pre-compute grouped weights for efficient execution."""
self.grouped_weights = self.scheduler.create_grouped_weights(self.layers)
def forward_sequential(self, x: torch.Tensor, memory: List[torch.Tensor]) -> Tuple[torch.Tensor, List[torch.Tensor]]:
"""Original sequential RMT forward (for comparison)."""
outputs = []
for layer_idx, layer in enumerate(self.layers):
x = layer(x)
memory[layer_idx] = x.clone()
outputs.append(x)
return x, memory
def forward_diagonal(self, x: torch.Tensor, memory: List[torch.Tensor],
num_segments: int) -> Tuple[torch.Tensor, List[torch.Tensor]]:
"""
Diagonal batching forward pass.
Reorganizes computation to run diagonals in parallel.
"""
# Reshape input into segments
B, T, D = x.shape
num_actual_segments = (T + self.segment_length - 1) // self.segment_length
# Generate diagonal schedule
diagonals = self.scheduler.compute_diagonals()
self.scheduler.num_segments = num_actual_segments
diagonals = self.scheduler.compute_diagonals()
# Process by diagonals
segment_hidden_states = []
for seg_idx in range(num_actual_segments):
seg_start = seg_idx * self.segment_length
seg_end = min(seg_start + self.segment_length, T)
segment = x[:, seg_start:seg_end, :]
# Process this segment through all layers (using diagonal batching)
for diag_idx, diagonal in enumerate(diagonals):
# Collect all (layer, segment) pairs in this diagonal relevant to current segment
relevant_pairs = [(l, s) for l, s in diagonal if s == seg_idx]
if relevant_pairs:
layer_indices = [l for l, _ in relevant_pairs]
# Execute grouped GEMM for these layers
if self.grouped_weights is not None:
# This is simplified; actual implementation would handle memory states
for layer_idx in layer_indices:
segment = self.layers[layer_idx](segment)
segment_hidden_states.append(segment)
# Concatenate all segments
output = torch.cat(segment_hidden_states, dim=1)
return output, memory
def forward(self, x: torch.Tensor, use_diagonal: bool = True) -> torch.Tensor:
"""Main forward pass with optional diagonal batching."""
if self.grouped_weights is None:
self.prepare_grouped_weights()
memory = [torch.zeros(x.shape[0], x.shape[1], self.hidden_dim, device=x.device)
for _ in range(self.num_layers)]
if use_diagonal:
output, _ = self.forward_diagonal(x, memory, num_segments=-1)
else:
output, _ = self.forward_sequential(x, memory)
return output
Diagonal Dependency Analysis: The key constraint is that computation at (layer l, segment s) depends only on (layer l-1, segment s) and (layer l, segment s-1). This creates the diagonal structure; verify your RMT variant satisfies this before applying diagonal batching.
GroupedGEMM Kernel Implementation: Modern frameworks like Triton or custom CUDA kernels can implement grouped matrix multiplication efficiently. For standard PyTorch, use torch.einsum or batched operations with proper reshaping.
Memory State Management: Keep per-layer memory states (recurrent state) on GPU between diagonal executions. Don't copy back to CPU; synchronize only at the end of inference.
Segment Length Tuning: Smaller segments (e.g., 512 tokens) create more parallelism but increase kernel launch overhead. Larger segments (e.g., 4096 tokens) reduce overhead but serialize more. Benchmark for your specific hardware.
Numerical Stability: Diagonal batching uses exact recurrence—numerical error is identical to sequential execution. No special stabilization required.
Hardware Compatibility: Works on any GPU supporting grouped matrix multiplication. H100, A100 show best results; older GPUs may have less efficient grouped GEMM kernels.
Diagonal batching achieves exceptional performance improvements:
The technique is particularly valuable for deploying long-context models in production where latency is critical, as it requires no model retraining or architectural modifications—it's a pure scheduling optimization.
testing
Uses flow maps as look-ahead operators to enable principled reward-guided diffusion by predicting trajectory endpoints at any denoising step. Deploy when applying rewards or preferences to diffusion trajectories with meaningful gradients throughout generation.
testing
Train language models where each expert learns independently on closed datasets, enabling flexible inference with selective data inclusion or exclusion. 41% performance improvement while allowing users to opt out of specific data sources without retraining.
data-ai
Understand how token generation flexibility in diffusion LMs paradoxically constrains reasoning, as models exploit ordering flexibility to avoid uncertain tokens, and apply simplified approaches that preserve parallel decoding benefits. Use when optimizing diffusion-based language models for reasoning tasks.
devops
Enable LLM agents to improve continuously during deployment by constructing structured experience libraries through self-reflection on successes and failures—achieving 23% improvement on reasoning without gradient-based parameter updates or external training.