skills/skillxiv-v0.0.2-claude-opus-4.6/deep-prune/SKILL.md
Reduce redundant tokens in parallel reasoning by 80% while maintaining accuracy via dynamic pruning of equivalent reasoning paths. Trigger: improve efficiency of consensus-based reasoning (multiple CoT generation).
npx skillsauth add ADu2021/skillXiv deep-pruneInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
When generating multiple reasoning chains (consensus sampling), over 80% of tokens are wasted on duplicate reasoning paths that reach the same conclusion. DeepPrune introduces a judge model that predicts answer equivalence from incomplete traces, allowing dynamic pruning of redundant paths before completion. The approach achieves 80%+ token reduction while maintaining accuracy within 3 percentage points.
The key insight: A lightweight classifier can identify when parallel reasoning traces will converge to the same answer before they finish generating.
Define what makes two reasoning paths equivalent.
class ReasoningEquivalenceAnalyzer:
"""
Determine when different reasoning traces reach the same conclusion.
"""
@staticmethod
def extract_answer(trace):
"""Extract final answer from reasoning trace."""
# Look for common answer markers
if "Answer:" in trace:
return trace.split("Answer:")[-1].strip().split('\n')[0]
elif "Therefore," in trace:
return trace.split("Therefore,")[-1].strip().split('\n')[0]
else:
# Use last meaningful line
lines = [l.strip() for l in trace.split('\n') if l.strip()]
return lines[-1] if lines else ""
@staticmethod
def are_answers_equivalent(answer1, answer2):
"""
Check if two answers are semantically equivalent.
"""
# Exact match
if answer1.lower() == answer2.lower():
return True
# Numeric equivalence
try:
num1 = float(answer1.replace(',', ''))
num2 = float(answer2.replace(',', ''))
return abs(num1 - num2) < 1e-6
except:
pass
# Jaccard similarity for text answers
tokens1 = set(answer1.lower().split())
tokens2 = set(answer2.lower().split())
if len(tokens1 | tokens2) == 0:
return False
jaccard = len(tokens1 & tokens2) / len(tokens1 | tokens2)
return jaccard > 0.8
Build a classifier that predicts answer equivalence from partial traces.
class JudgeModel:
"""
Predict if incomplete reasoning traces will reach equivalent answers.
"""
def __init__(self, model, hidden_size=256):
self.model = model
# Simple MLP judge: embedding → classification
self.embedding_layer = torch.nn.Linear(768, hidden_size) # BERT-like
self.hidden_layer = torch.nn.Linear(hidden_size, 128)
self.output_layer = torch.nn.Linear(128, 1) # Binary classification
def embed_trace(self, partial_trace):
"""
Embed a partial reasoning trace.
"""
# Use pretrained encoder
tokens = tokenize(partial_trace)
embeddings = self.model.encode(tokens)
# Mean pooling over sequence
trace_embedding = torch.mean(embeddings, dim=0)
return trace_embedding
def predict_equivalence(self, trace1, trace2):
"""
Predict: will these traces reach the same answer?
Args:
trace1, trace2: Partial reasoning traces
Returns:
Probability [0, 1] that answers will be equivalent
"""
# Embed both traces
emb1 = self.embed_trace(trace1)
emb2 = self.embed_trace(trace2)
# Concatenate embeddings
combined = torch.cat([emb1, emb2], dim=-1)
# Classify
hidden = torch.relu(self.embedding_layer(combined))
hidden = torch.relu(self.hidden_layer(hidden))
logit = self.output_layer(hidden)
prob = torch.sigmoid(logit)
return prob.item()
def train_on_dataset(self, reasoning_pairs, num_epochs=10):
"""
Train judge on pairs of complete reasoning traces.
Args:
reasoning_pairs: List of (trace1, trace2, are_equivalent)
"""
optimizer = torch.optim.Adam(self.parameters(), lr=1e-4)
# Compute class weights for focal loss (handle imbalance)
equiv_count = sum(1 for _, _, equiv in reasoning_pairs if equiv)
total_count = len(reasoning_pairs)
pos_weight = (total_count - equiv_count) / (equiv_count + 1)
for epoch in range(num_epochs):
epoch_loss = 0
for trace1, trace2, are_equiv in reasoning_pairs:
# Embed traces
emb1 = self.embed_trace(trace1)
emb2 = self.embed_trace(trace2)
combined = torch.cat([emb1, emb2], dim=-1)
# Forward pass
hidden = torch.relu(self.embedding_layer(combined))
hidden = torch.relu(self.hidden_layer(hidden))
logit = self.output_layer(hidden)
prob = torch.sigmoid(logit)
# Focal loss: emphasize hard examples
target = float(are_equiv)
bce_loss = torch.nn.functional.binary_cross_entropy(
prob,
torch.tensor(target)
)
# Focal term
pt = prob if target == 1 else (1 - prob)
focal_loss = -(1 - pt) ** 2 * bce_loss
# Backprop
optimizer.zero_grad()
focal_loss.backward()
optimizer.step()
epoch_loss += focal_loss.item()
print(f"Epoch {epoch}: loss={epoch_loss / len(reasoning_pairs):.4f}")
Prune redundant traces during generation.
class DynamicPruner:
"""
Prune redundant reasoning traces on-the-fly.
"""
def __init__(self, judge_model, redundancy_threshold=0.85):
self.judge = judge_model
self.threshold = redundancy_threshold
def should_prune_trace(self, current_traces, new_trace):
"""
Decide if new_trace is redundant with existing ones.
Args:
current_traces: List of active reasoning traces
new_trace: New trace to evaluate
Returns:
Boolean: should prune this trace?
"""
for existing_trace in current_traces:
# Compare at partial level
equivalence_prob = self.judge.predict_equivalence(
existing_trace,
new_trace
)
if equivalence_prob > self.threshold:
# New trace is redundant with existing one
return True
# Not redundant with any existing trace
return False
def prune_and_cluster(self, all_traces):
"""
Group traces into equivalence clusters.
Args:
all_traces: All completed reasoning traces
Returns:
List of equivalence clusters (representative + size)
"""
clusters = []
for trace in all_traces:
assigned = False
for cluster in clusters:
# Check equivalence with cluster representative
equiv_prob = self.judge.predict_equivalence(
cluster["representative"],
trace
)
if equiv_prob > self.threshold:
cluster["size"] += 1
assigned = True
break
if not assigned:
# New cluster
clusters.append({
"representative": trace,
"size": 1
})
return clusters
Generate multiple traces in parallel, pruning redundant ones.
class PrunedParallelReasoner:
"""
Generate multiple reasoning traces with dynamic pruning.
"""
def __init__(self, model, judge_model, num_parallel=4):
self.model = model
self.pruner = DynamicPruner(judge_model)
self.num_parallel = num_parallel
def generate_with_pruning(self, problem, max_tokens=500):
"""
Generate multiple reasoning traces, pruning redundant ones.
Args:
problem: Problem to reason about
max_tokens: Max tokens per trace
Returns:
Dictionary with unique traces and redundancy stats
"""
active_traces = [""] * self.num_parallel
token_counts = [0] * self.num_parallel
completed_traces = []
pruned_count = 0
# Generate tokens in rounds
for token_round in range(max_tokens):
for trace_idx in range(len(active_traces)):
if active_traces[trace_idx] is None:
continue # Already pruned
# Generate next token
next_token = self.model.generate_one_token(
problem + active_traces[trace_idx],
temperature=0.8
)
active_traces[trace_idx] += next_token
token_counts[trace_idx] += 1
# Check for completion
if is_complete_solution(active_traces[trace_idx]):
completed_traces.append(active_traces[trace_idx])
active_traces[trace_idx] = None
# Pruning step: every 50 tokens, check for redundancy
if token_round % 50 == 0:
for idx in range(len(active_traces)):
if active_traces[idx] is None:
continue
# Check if redundant with completed traces
for completed in completed_traces:
equiv_prob = self.pruner.judge.predict_equivalence(
active_traces[idx],
completed
)
if equiv_prob > self.pruner.threshold:
# Prune this trace
active_traces[idx] = None
pruned_count += 1
break
# Collect remaining active traces
for trace in active_traces:
if trace is not None:
completed_traces.append(trace)
# Cluster identical answers
clusters = self.pruner.prune_and_cluster(completed_traces)
# Calculate efficiency
total_tokens_generated = sum(token_counts)
unique_traces = len(clusters)
savings_ratio = (total_tokens_generated - sum(
len(c["representative"].split()) * c["size"]
for c in clusters
)) / total_tokens_generated
return {
"traces": [c["representative"] for c in clusters],
"cluster_sizes": [c["size"] for c in clusters],
"total_tokens": total_tokens_generated,
"pruned_traces": pruned_count,
"savings_ratio": savings_ratio
}
Measure pruning effectiveness.
def evaluate_deep_prune(model, judge_model, benchmark_dataset):
"""
Measure token savings and accuracy impact.
"""
reasoner = PrunedParallelReasoner(model, judge_model, num_parallel=8)
total_tokens_baseline = 0
total_tokens_pruned = 0
correct_count = 0
for problem in benchmark_dataset:
# With pruning
result = reasoner.generate_with_pruning(problem)
# Get consensus answer from most-represented cluster
largest_cluster = max(
zip(result["traces"], result["cluster_sizes"]),
key=lambda x: x[1]
)
consensus_answer = extract_answer(largest_cluster[0])
if evaluate_correctness(consensus_answer, problem):
correct_count += 1
# Count tokens
# Baseline: generate 8 full traces (max_tokens each)
total_tokens_baseline += 500 * 8
# Pruned: actual tokens generated
total_tokens_pruned += result["total_tokens"]
accuracy = correct_count / len(benchmark_dataset) * 100
savings = (1 - total_tokens_pruned / total_tokens_baseline) * 100
print(f"Accuracy: {accuracy:.1f}%")
print(f"Token savings: {savings:.1f}%")
print(f"Tokens per problem: baseline={total_tokens_baseline/len(benchmark_dataset):.0f}, "
f"pruned={total_tokens_pruned/len(benchmark_dataset):.0f}")
return {
"accuracy": accuracy,
"token_savings_percent": savings
}
Hyperparameters:
When to Use:
When NOT to Use:
DeepPrune: Parallel Scaling without Redundancy — arXiv:2510.08483
testing
Uses flow maps as look-ahead operators to enable principled reward-guided diffusion by predicting trajectory endpoints at any denoising step. Deploy when applying rewards or preferences to diffusion trajectories with meaningful gradients throughout generation.
testing
Train language models where each expert learns independently on closed datasets, enabling flexible inference with selective data inclusion or exclusion. 41% performance improvement while allowing users to opt out of specific data sources without retraining.
data-ai
Understand how token generation flexibility in diffusion LMs paradoxically constrains reasoning, as models exploit ordering flexibility to avoid uncertain tokens, and apply simplified approaches that preserve parallel decoding benefits. Use when optimizing diffusion-based language models for reasoning tasks.
devops
Enable LLM agents to improve continuously during deployment by constructing structured experience libraries through self-reflection on successes and failures—achieving 23% improvement on reasoning without gradient-based parameter updates or external training.