skills/skillxiv-v0.0.2-claude-opus-4.6/discrete-diffusion-faster-inference/SKILL.md
Enables diffusion LLMs to achieve 2.5× faster inference than autoregressive models through block-wise generation with parallel inter-block decoding.
npx skillsauth add ADu2021/skillXiv discrete-diffusion-faster-inferenceInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Discrete Diffusion Forcing (D2F) enhances diffusion-based LLMs to achieve faster inference than autoregressive models by combining block-wise autoregressive generation (enabling KV cache) with inter-block parallel decoding. This hybrid approach leverages the parallelization potential of diffusion models while maintaining efficiency mechanisms of AR models.
Step 1: Design Block-Wise Generation
Implement block-based generation:
class BlockWiseGenerator:
def __init__(self, diffusion_model, block_size=32):
super().__init__()
self.model = diffusion_model
self.block_size = block_size
def generate_with_blocks(self, prompt, max_blocks=10):
"""Generate text in blocks with KV caching."""
generated_blocks = []
kv_cache = None
for block_idx in range(max_blocks):
# Generate single block
block = self._generate_block(prompt, block_idx, kv_cache)
if block is None:
break
generated_blocks.append(block)
# Update cache
if kv_cache is None:
kv_cache = block['cache']
else:
kv_cache = self._merge_caches(kv_cache, block['cache'])
return ''.join([b['text'] for b in generated_blocks])
def _generate_block(self, prompt, block_idx, kv_cache):
"""Generate single block with AR within block."""
block_tokens = []
for token_in_block in range(self.block_size):
# Use KV cache from previous blocks
with torch.no_grad():
outputs = self.model(
prompt + ''.join(block_tokens),
kv_cache=kv_cache
)
# AR sampling within block
logits = outputs.logits[:, -1, :]
next_token = torch.multinomial(F.softmax(logits, dim=-1), 1)
block_tokens.append(next_token.item())
return {
'text': ''.join(block_tokens),
'tokens': block_tokens,
'cache': outputs.past_key_values
}
Step 2: Implement Parallel Inter-Block Decoding
Enable parallel block prediction:
class ParallelBlockDecoder:
def __init__(self, diffusion_model, num_parallel_blocks=3):
super().__init__()
self.model = diffusion_model
self.num_parallel = num_parallel_blocks
def generate_parallel(self, prompt, max_blocks=20):
"""Generate multiple blocks in parallel."""
import asyncio
import concurrent.futures
all_blocks = []
block_futures = []
with concurrent.futures.ThreadPoolExecutor(max_workers=self.num_parallel) as executor:
# Submit first batch of blocks
for block_idx in range(self.num_parallel):
future = executor.submit(
self._generate_block_parallel,
prompt,
block_idx
)
block_futures.append((block_idx, future))
# Collect results and submit new blocks
block_idx = self.num_parallel
while block_futures:
completed_futures = []
for idx, future in block_futures:
if future.done():
block = future.result()
all_blocks.append((idx, block))
completed_futures.append((idx, future))
# Submit next block
if block_idx < max_blocks:
new_future = executor.submit(
self._generate_block_parallel,
prompt + self._construct_prefix(all_blocks),
block_idx
)
block_futures.append((block_idx, new_future))
block_idx += 1
# Remove completed futures
for item in completed_futures:
block_futures.remove(item)
# Sort by index and concatenate
all_blocks.sort(key=lambda x: x[0])
return ''.join([b[1]['text'] for b in all_blocks])
def _generate_block_parallel(self, prompt, block_idx):
"""Generate block (for parallel execution)."""
block_tokens = []
for token_in_block in range(32):
with torch.no_grad():
outputs = self.model(prompt + ''.join(block_tokens))
logits = outputs.logits[:, -1, :]
next_token = torch.multinomial(F.softmax(logits, dim=-1), 1)
block_tokens.append(next_token.item())
return {
'text': ''.join(block_tokens),
'block_idx': block_idx
}
def _construct_prefix(self, completed_blocks):
"""Construct prefix from completed blocks."""
return ''.join([b[1]['text'] for b in sorted(completed_blocks, key=lambda x: x[0])])
Step 3: Implement Asymmetric Distillation
Train D2F models:
class AsymmetricDistillation:
def __init__(self, teacher_model, student_model):
super().__init__()
self.teacher = teacher_model
self.student = student_model
def distill(self, training_data, num_epochs=3):
"""Distill teacher to student with D2F training."""
optimizer = AdamW(self.student.parameters(), lr=1e-4)
for epoch in range(num_epochs):
total_loss = 0
for batch in training_data:
# Teacher generates reference
with torch.no_grad():
teacher_outputs = self.teacher(batch['input_ids'])
teacher_logits = teacher_outputs.logits
# Student generates with D2F
student_outputs = self.student(batch['input_ids'])
student_logits = student_outputs.logits
# KL divergence loss
loss = F.kl_div(
F.log_softmax(student_logits, dim=-1),
F.softmax(teacher_logits, dim=-1),
reduction='batchmean'
)
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.student.parameters(), 1.0)
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}: Loss = {total_loss / len(training_data):.4f}")
return self.student
Step 4: Benchmark Inference Speed
Measure speedup:
class InferenceSpeedBenchmark:
def __init__(self, ar_model, diffusion_model, d2f_model):
super().__init__()
self.ar = ar_model
self.diffusion = diffusion_model
self.d2f = d2f_model
def benchmark(self, prompts, max_tokens=256):
"""Compare inference speeds."""
import time
results = {}
# AR baseline
ar_times = []
for prompt in prompts:
start = time.time()
_ = self.ar.generate(prompt, max_length=max_tokens)
ar_times.append(time.time() - start)
results['ar'] = {
'avg_time': np.mean(ar_times),
'tokens_per_sec': max_tokens / np.mean(ar_times)
}
# Pure diffusion
diffusion_times = []
for prompt in prompts:
start = time.time()
_ = self.diffusion.generate(prompt, max_length=max_tokens)
diffusion_times.append(time.time() - start)
results['diffusion'] = {
'avg_time': np.mean(diffusion_times),
'tokens_per_sec': max_tokens / np.mean(diffusion_times)
}
# D2F (parallel blocks)
d2f_times = []
for prompt in prompts:
start = time.time()
_ = self.d2f.generate_parallel(prompt, max_blocks=max_tokens // 32)
d2f_times.append(time.time() - start)
results['d2f'] = {
'avg_time': np.mean(d2f_times),
'tokens_per_sec': max_tokens / np.mean(d2f_times),
'speedup_vs_ar': np.mean(ar_times) / np.mean(d2f_times)
}
return results
Hyperparameters and Configuration:
When to Use D2F:
When NOT to Use:
Implementation Notes:
Paper: Diffusion LLMs Can Do Faster-Than-AR Inference via Discrete Diffusion Forcing ArXiv: 2508.09192 Performance: 2.5× faster than LLaMA3/Qwen2.5, 50× faster than vanilla diffusion LLMs
testing
Uses flow maps as look-ahead operators to enable principled reward-guided diffusion by predicting trajectory endpoints at any denoising step. Deploy when applying rewards or preferences to diffusion trajectories with meaningful gradients throughout generation.
testing
Train language models where each expert learns independently on closed datasets, enabling flexible inference with selective data inclusion or exclusion. 41% performance improvement while allowing users to opt out of specific data sources without retraining.
data-ai
Understand how token generation flexibility in diffusion LMs paradoxically constrains reasoning, as models exploit ordering flexibility to avoid uncertain tokens, and apply simplified approaches that preserve parallel decoding benefits. Use when optimizing diffusion-based language models for reasoning tasks.
devops
Enable LLM agents to improve continuously during deployment by constructing structured experience libraries through self-reflection on successes and failures—achieving 23% improvement on reasoning without gradient-based parameter updates or external training.