toolchains/ai/techniques/session-compression/SKILL.md
AI session compression techniques for managing multi-turn conversations efficiently through summarization, embedding-based retrieval, and intelligent context management.
npx skillsauth add bobmatnyc/claude-mpm-skills session-compressionInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Compress long AI conversations to fit context windows while preserving critical information.
Session compression enables production AI applications to manage multi-turn conversations efficiently by reducing token usage by 70-95% through summarization, embedding-based retrieval, and intelligent context management. Achieve 3-20x compression ratios with minimal performance degradation.
Key Benefits:
Use session compression when:
Don't use when:
Ideal scenarios:
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
from anthropic import Anthropic
# Initialize Claude client
llm = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
api_key="your-api-key"
)
# Setup memory with automatic summarization
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=2000, # Summarize when exceeding this
return_messages=True
)
# Add conversation turns
memory.save_context(
{"input": "What's session compression?"},
{"output": "Session compression reduces conversation token usage..."}
)
# Retrieve compressed context
context = memory.load_memory_variables({})
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
class ProgressiveCompressor:
def __init__(self, thresholds=[0.70, 0.85, 0.95]):
self.thresholds = thresholds
self.messages = []
self.max_tokens = 200000 # Claude context window
def add_message(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
# Check if compression needed
current_usage = self._estimate_tokens()
usage_ratio = current_usage / self.max_tokens
if usage_ratio >= self.thresholds[0]:
self._compress(level=self._get_compression_level(usage_ratio))
def _estimate_tokens(self):
return sum(len(m["content"]) // 4 for m in self.messages)
def _get_compression_level(self, ratio):
for i, threshold in enumerate(self.thresholds):
if ratio < threshold:
return i
return len(self.thresholds)
def _compress(self, level: int):
"""Apply compression based on severity level."""
if level == 1: # 70% threshold: Light compression
self._remove_redundant_messages()
elif level == 2: # 85% threshold: Medium compression
self._summarize_old_messages(keep_recent=10)
else: # 95% threshold: Aggressive compression
self._summarize_old_messages(keep_recent=5)
def _remove_redundant_messages(self):
"""Remove duplicate or low-value messages."""
# Implementation: Use semantic deduplication
pass
def _summarize_old_messages(self, keep_recent: int):
"""Summarize older messages, keep recent ones verbatim."""
if len(self.messages) <= keep_recent:
return
# Messages to summarize
to_summarize = self.messages[:-keep_recent]
recent = self.messages[-keep_recent:]
# Generate summary
conversation_text = "\n\n".join([
f"{m['role'].upper()}: {m['content']}"
for m in to_summarize
])
response = client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=500,
messages=[{
"role": "user",
"content": f"Summarize this conversation:\n\n{conversation_text}"
}]
)
# Replace old messages with summary
summary = {
"role": "system",
"content": f"[Summary]\n{response.content[0].text}"
}
self.messages = [summary] + recent
# Usage
compressor = ProgressiveCompressor()
for i in range(100):
compressor.add_message("user", f"Message {i}")
compressor.add_message("assistant", f"Response {i}")
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
# Build context with cache control
messages = [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Long conversation context here...",
"cache_control": {"type": "ephemeral"} # Cache this
}
]
},
{
"role": "assistant",
"content": "Previous response..."
},
{
"role": "user",
"content": "New question" # Not cached, changes frequently
}
]
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=messages
)
# Cache hit reduces costs by 90% for cached content
Context window: Maximum tokens an LLM can process in a single request (input + output).
Current limits (2025):
Token estimation:
Why compression matters:
Compression ratio = Original tokens / Compressed tokens
Industry benchmarks:
Target ratios by use case:
Industry standard pattern:
Context Usage Action Technique
─────────────────────────────────────────────────────────
0-70% No compression Store verbatim
70-85% Light compression Remove redundancy
85-95% Medium compression Summarize old messages
95-100% Aggressive compression Hierarchical + RAG
Implementation guidelines:
Selects key sentences/phrases without modification.
Pros: No hallucination, fast, deterministic Cons: Limited compression (2-3x), may feel disjointed Best for: Legal/compliance, short-term compression
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
def extractive_compress(messages: list, compression_ratio: float = 0.3):
"""Extract most important messages using TF-IDF scoring."""
texts = [msg['content'] for msg in messages]
# Calculate TF-IDF scores
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform(texts)
scores = np.array(tfidf_matrix.sum(axis=1)).flatten()
# Select top messages
n_keep = max(1, int(len(messages) * compression_ratio))
top_indices = sorted(np.argsort(scores)[-n_keep:])
return [messages[i] for i in top_indices]
Uses LLMs to semantically condense conversation history.
Pros: Higher compression (5-10x), coherent, synthesizes information Cons: Risk of hallucination, higher cost, less deterministic Best for: General chat, customer support, multi-session continuity
from anthropic import Anthropic
def abstractive_compress(messages: list, client: Anthropic):
"""Generate semantic summary using Claude."""
conversation_text = "\n\n".join([
f"{msg['role'].upper()}: {msg['content']}"
for msg in messages
])
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=500,
messages=[{
"role": "user",
"content": f"""Summarize this conversation, preserving:
1. Key decisions made
2. Important context and facts
3. Unresolved questions
4. Action items
Conversation:
{conversation_text}
Summary (aim for 1/5 the original length):"""
}]
)
return {
"role": "assistant",
"content": f"[Summary]\n{response.content[0].text}"
}
Creates summaries of summaries in a tree structure.
Pros: Extreme compression (20x+), handles 1M+ token conversations Cons: Complex implementation, multiple LLM calls, information loss accumulates Best for: Long-running conversations, multi-session applications
Architecture:
Level 0 (Raw): [Msg1][Msg2][Msg3][Msg4][Msg5][Msg6][Msg7][Msg8]
Level 1 (Chunk): [Summary1-2] [Summary3-4] [Summary5-6] [Summary7-8]
Level 2 (Group): [Summary1-4] [Summary5-8]
Level 3 (Session): [Overall Session Summary]
from anthropic import Anthropic
from typing import List, Dict
class HierarchicalMemory:
def __init__(self, client: Anthropic, chunk_size: int = 10):
self.client = client
self.chunk_size = chunk_size
self.levels: List[List[Dict]] = [[]] # Level 0 = raw messages
def add_message(self, message: Dict):
"""Add message and trigger summarization if needed."""
self.levels[0].append(message)
if len(self.levels[0]) >= self.chunk_size * 2:
self._summarize_level(0)
def _summarize_level(self, level: int):
"""Summarize a level into the next higher level."""
messages = self.levels[level]
# Ensure next level exists
while len(self.levels) <= level + 1:
self.levels.append([])
# Summarize first chunk
chunk = messages[:self.chunk_size]
summary = self._generate_summary(chunk, level)
# Move to next level
self.levels[level + 1].append(summary)
self.levels[level] = messages[self.chunk_size:]
# Recursively check if next level needs summarization
if len(self.levels[level + 1]) >= self.chunk_size * 2:
self._summarize_level(level + 1)
def _generate_summary(self, messages: List[Dict], level: int) -> Dict:
"""Generate summary for a chunk."""
conversation_text = "\n\n".join([
f"{msg['role'].upper()}: {msg['content']}"
for msg in messages
])
response = self.client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=300,
messages=[{
"role": "user",
"content": f"Summarize this Level {level} conversation chunk:\n\n{conversation_text}"
}]
)
return {
"role": "system",
"content": f"[L{level+1} Summary] {response.content[0].text}",
"level": level + 1
}
def get_context(self, max_tokens: int = 4000) -> List[Dict]:
"""Retrieve context within token budget."""
context = []
token_count = 0
# Prioritize recent raw messages
for msg in reversed(self.levels[0]):
msg_tokens = len(msg['content']) // 4
if token_count + msg_tokens > max_tokens * 0.6:
break
context.insert(0, msg)
token_count += msg_tokens
# Add summaries from higher levels
for level in range(1, len(self.levels)):
for summary in self.levels[level]:
summary_tokens = len(summary['content']) // 4
if token_count + summary_tokens > max_tokens:
break
context.insert(0, summary)
token_count += summary_tokens
return context
Academic reference: "Recursively Summarizing Enables Long-Term Dialogue Memory in Large Language Models" (arXiv:2308.15022)
Continuously compresses conversation with sliding window.
Pros: Low latency, predictable token usage, simple Cons: Early details over-compressed, no information recovery Best for: Real-time chat, streaming conversations
from anthropic import Anthropic
class RollingMemory:
def __init__(self, client: Anthropic, window_size: int = 10, compress_threshold: int = 15):
self.client = client
self.window_size = window_size
self.compress_threshold = compress_threshold
self.rolling_summary = None
self.recent_messages = []
def add_message(self, message: dict):
self.recent_messages.append(message)
if len(self.recent_messages) >= self.compress_threshold:
self._compress()
def _compress(self):
"""Compress older messages into rolling summary."""
messages_to_compress = self.recent_messages[:-self.window_size]
parts = []
if self.rolling_summary:
parts.append(f"Existing summary:\n{self.rolling_summary}")
parts.append("\nNew messages:\n" + "\n\n".join([
f"{msg['role']}: {msg['content']}"
for msg in messages_to_compress
]))
response = self.client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=400,
messages=[{
"role": "user",
"content": "\n".join(parts) + "\n\nUpdate the summary:"
}]
)
self.rolling_summary = response.content[0].text
self.recent_messages = self.recent_messages[-self.window_size:]
def get_context(self):
context = []
if self.rolling_summary:
context.append({
"role": "system",
"content": f"[Summary]\n{self.rolling_summary}"
})
context.extend(self.recent_messages)
return context
Store full conversation in vector database, retrieve only relevant chunks.
Pros: Extremely scalable, no information loss, high relevance Cons: Requires vector DB infrastructure, retrieval latency Best for: Knowledge bases, customer support with large history
from anthropic import Anthropic
from openai import OpenAI
import chromadb
class RAGMemory:
def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):
self.anthropic = anthropic_client
self.openai = openai_client
# Initialize vector store
self.chroma = chromadb.Client()
self.collection = self.chroma.create_collection(
name="conversation",
metadata={"hnsw:space": "cosine"}
)
self.recent_messages = []
self.recent_window = 5
self.message_counter = 0
def add_message(self, message: dict):
"""Add to recent memory and vector store."""
self.recent_messages.append(message)
if len(self.recent_messages) > self.recent_window:
old_msg = self.recent_messages.pop(0)
self._store_in_vectordb(old_msg)
def _store_in_vectordb(self, message: dict):
"""Archive to vector database."""
# Generate embedding
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=message['content']
)
self.collection.add(
embeddings=[response.data[0].embedding],
documents=[message['content']],
metadatas=[{"role": message['role']}],
ids=[f"msg_{self.message_counter}"]
)
self.message_counter += 1
def retrieve_context(self, query: str, max_tokens: int = 4000):
"""Retrieve relevant context using RAG."""
context = []
token_count = 0
# 1. Recent messages (short-term memory)
for msg in self.recent_messages:
context.append(msg)
token_count += len(msg['content']) // 4
# 2. Retrieve relevant historical context
if token_count < max_tokens:
query_embedding = self.openai.embeddings.create(
model="text-embedding-3-small",
input=query
)
n_results = min(10, (max_tokens - token_count) // 100)
results = self.collection.query(
query_embeddings=[query_embedding.data[0].embedding],
n_results=n_results
)
for i, doc in enumerate(results['documents'][0]):
if token_count + len(doc) // 4 > max_tokens:
break
metadata = results['metadatas'][0][i]
context.insert(0, {
"role": metadata['role'],
"content": f"[Retrieved] {doc}"
})
token_count += len(doc) // 4
return context
Vector database options:
Group similar messages into clusters, represent with centroids.
Pros: Reduces redundancy, identifies themes, multi-topic handling Cons: Requires sufficient data, may lose nuances Best for: Multi-topic conversations, meeting summaries
from sklearn.cluster import KMeans
from openai import OpenAI
import numpy as np
class ClusteredMemory:
def __init__(self, openai_client: OpenAI, n_clusters: int = 5):
self.client = openai_client
self.n_clusters = n_clusters
self.messages = []
self.embeddings = []
def add_messages(self, messages: list):
for msg in messages:
self.messages.append(msg)
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=msg['content']
)
self.embeddings.append(response.data[0].embedding)
def compress_by_clustering(self):
"""Cluster messages and return representatives."""
if len(self.messages) < self.n_clusters:
return self.messages
embeddings_array = np.array(self.embeddings)
kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
labels = kmeans.fit_predict(embeddings_array)
# Select message closest to each centroid
compressed = []
for cluster_id in range(self.n_clusters):
cluster_indices = np.where(labels == cluster_id)[0]
centroid = kmeans.cluster_centers_[cluster_id]
cluster_embeddings = embeddings_array[cluster_indices]
distances = np.linalg.norm(cluster_embeddings - centroid, axis=1)
closest_idx = cluster_indices[np.argmin(distances)]
compressed.append({
**self.messages[closest_idx],
"cluster_id": int(cluster_id),
"cluster_size": len(cluster_indices)
})
return compressed
Remove semantically similar messages that convey redundant information.
Pros: Reduces redundancy without losing unique content Cons: Requires threshold tuning, O(n²) complexity Best for: FAQ systems, repetitive conversations
from openai import OpenAI
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
class SemanticDeduplicator:
def __init__(self, openai_client: OpenAI, similarity_threshold: float = 0.85):
self.client = openai_client
self.threshold = similarity_threshold
def deduplicate(self, messages: list):
"""Remove semantically similar messages."""
if len(messages) <= 1:
return messages
# Generate embeddings
embeddings = []
for msg in messages:
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=msg['content']
)
embeddings.append(response.data[0].embedding)
embeddings_array = np.array(embeddings)
similarity_matrix = cosine_similarity(embeddings_array)
# Mark unique messages
keep_indices = []
for i in range(len(messages)):
is_unique = True
for j in keep_indices:
if similarity_matrix[i][j] > self.threshold:
is_unique = False
break
if is_unique:
keep_indices.append(i)
return [messages[i] for i in keep_indices]
Assign importance scores and retain only high-priority content.
Pros: Retains most important information, flexible criteria Cons: Scoring is heuristic-based, may break flow Best for: Mixed-importance conversations, filtering noise
import re
class MessagePrioritizer:
def score_message(self, msg: dict, index: int, total: int) -> float:
"""Calculate composite importance score."""
scores = []
# Length score (longer = more info)
scores.append(min(len(msg['content']) / 500, 1.0))
# Question score
if msg['role'] == 'user':
scores.append(min(msg['content'].count('?') * 0.5, 1.0))
# Entity score (capitalized words)
entities = len(re.findall(r'\b[A-Z][a-z]+', msg['content']))
scores.append(min(entities / 10, 1.0))
# Recency score (linear decay)
scores.append(index / max(total - 1, 1))
# Role score
scores.append(0.6 if msg['role'] == 'user' else 0.4)
return sum(scores) / len(scores)
def prioritize(self, messages: list, target_count: int):
"""Select top N messages by priority."""
scored = [
(msg, self.score_message(msg, i, len(messages)), i)
for i, msg in enumerate(messages)
]
scored.sort(key=lambda x: x[1], reverse=True)
top_messages = scored[:target_count]
top_messages.sort(key=lambda x: x[2]) # Restore chronological order
return [msg for msg, score, idx in top_messages]
Store only changes between consecutive messages.
Pros: Highly efficient for incremental changes Cons: Reconstruction overhead, not suitable for all content Best for: Code assistants with incremental edits
import difflib
class DeltaCompressor:
def __init__(self):
self.base_messages = []
self.deltas = []
def add_message(self, message: dict):
if not self.base_messages:
self.base_messages.append(message)
return
# Find most similar previous message
last_msg = self.base_messages[-1]
if last_msg['role'] == message['role']:
# Calculate delta
diff = list(difflib.unified_diff(
last_msg['content'].splitlines(),
message['content'].splitlines(),
lineterm=''
))
if len('\n'.join(diff)) < len(message['content']) * 0.7:
# Store as delta if compression achieved
self.deltas.append({
'base_index': len(self.base_messages) - 1,
'delta': diff,
'role': message['role']
})
return
# Store as new base message
self.base_messages.append(message)
def reconstruct(self):
"""Reconstruct full conversation from bases + deltas."""
messages = self.base_messages.copy()
for delta_info in self.deltas:
base_content = messages[delta_info['base_index']]['content']
# Apply diff to reconstruct (simplified)
reconstructed = base_content # Full implementation would apply diff
messages.append({
'role': delta_info['role'],
'content': reconstructed
})
return messages
Automatically summarizes conversation as it progresses.
from langchain.memory import ConversationSummaryMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
memory = ConversationSummaryMemory(llm=llm)
# Add conversation
memory.save_context(
{"input": "Hi, I'm working on a Python project"},
{"output": "Great! How can I help with your Python project?"}
)
# Get summary
summary = memory.load_memory_variables({})
print(summary['history'])
Pros: Automatic summarization, simple API Cons: Every turn triggers LLM call Best for: Medium conversations (20-50 turns)
Hybrid: Recent messages verbatim, older summarized.
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-haiku-20241022")
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=2000, # Summarize when exceeding
return_messages=True
)
# Add conversation
for i in range(50):
memory.save_context(
{"input": f"Question {i}"},
{"output": f"Answer {i}"}
)
# Automatically keeps recent messages + summary of old
context = memory.load_memory_variables({})
Pros: Best balance of detail and compression Cons: Requires token limit tuning Best for: Most production applications
Maintains fixed token budget, drops oldest when exceeded.
from langchain.memory import ConversationTokenBufferMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
memory = ConversationTokenBufferMemory(
llm=llm,
max_token_limit=2000
)
# Simple FIFO when token limit exceeded
Pros: Predictable token usage, simple Cons: Loses old information completely Best for: Real-time chat with strict limits
Stores all messages in vector database, retrieves relevant ones.
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(embedding_function=embeddings)
memory = VectorStoreRetrieverMemory(
retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)
# Automatically retrieves most relevant context
Pros: Infinite conversation length, semantic retrieval Cons: Requires vector DB, retrieval overhead Best for: Long-running conversations, knowledge bases
Cache static context to reduce token costs.
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
# Long conversation context
conversation_history = [
{"role": "user", "content": "Message 1"},
{"role": "assistant", "content": "Response 1"},
# ... many more messages
]
# Mark context for caching
messages = []
for i, msg in enumerate(conversation_history[:-1]):
content = msg['content']
# Add cache control to last context message
if i == len(conversation_history) - 2:
messages.append({
"role": msg['role'],
"content": [
{
"type": "text",
"text": content,
"cache_control": {"type": "ephemeral"}
}
]
})
else:
messages.append(msg)
# Add new user message (not cached)
messages.append(conversation_history[-1])
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=messages
)
# Subsequent calls with same cached context cost 90% less
Cache TTL: 5 minutes Savings: 90% cost reduction for cached tokens Limits: Max 4 cache breakpoints per request Best practices:
Use extended thinking to plan optimal compression strategy.
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
response = client.messages.create(
model="claude-3-7-sonnet-20250219",
max_tokens=16000,
thinking={
"type": "enabled",
"budget_tokens": 10000
},
messages=[{
"role": "user",
"content": f"""Analyze this conversation and recommend compression:
{conversation_text}
Current token count: {current_tokens}
Target: {target_tokens}
Required compression: {compression_ratio}x
Recommend optimal strategy."""
}]
)
# Access thinking process
thinking_content = [
block for block in response.content
if block.type == "thinking"
]
# Get compression recommendation
recommendation = response.content[-1].text
Save compression state for recovery and resume.
import json
import pickle
from pathlib import Path
class PersistentMemory:
def __init__(self, checkpoint_dir: str = "./checkpoints"):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(exist_ok=True)
self.memory = []
self.summary = None
def save_checkpoint(self, session_id: str):
"""Save current memory state."""
checkpoint = {
'messages': self.memory,
'summary': self.summary,
'timestamp': time.time()
}
checkpoint_file = self.checkpoint_dir / f"{session_id}.json"
with open(checkpoint_file, 'w') as f:
json.dump(checkpoint, f, indent=2)
def load_checkpoint(self, session_id: str):
"""Load memory state from checkpoint."""
checkpoint_file = self.checkpoint_dir / f"{session_id}.json"
if checkpoint_file.exists():
with open(checkpoint_file, 'r') as f:
checkpoint = json.load(f)
self.memory = checkpoint['messages']
self.summary = checkpoint.get('summary')
return True
return False
def auto_checkpoint(self, session_id: str, interval: int = 10):
"""Automatically save every N messages."""
if len(self.memory) % interval == 0:
self.save_checkpoint(session_id)
Continue conversations across sessions.
from anthropic import Anthropic
import json
class ResumableConversation:
def __init__(self, client: Anthropic, session_id: str):
self.client = client
self.session_id = session_id
self.memory = self._load_or_create()
def _load_or_create(self):
"""Load existing session or create new."""
try:
with open(f'sessions/{self.session_id}.json', 'r') as f:
return json.load(f)
except FileNotFoundError:
return {
'messages': [],
'summary': None,
'created_at': time.time()
}
def add_turn(self, user_message: str):
"""Add user message and get response."""
# Add user message
self.memory['messages'].append({
'role': 'user',
'content': user_message
})
# Build context (with compression)
context = self._build_context()
# Get response
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=context + [{
'role': 'user',
'content': user_message
}]
)
# Save response
assistant_message = response.content[0].text
self.memory['messages'].append({
'role': 'assistant',
'content': assistant_message
})
# Compress if needed
if len(self.memory['messages']) > 20:
self._compress()
# Save state
self._save()
return assistant_message
def _build_context(self):
"""Build context with compression."""
context = []
# Add summary if exists
if self.memory['summary']:
context.append({
'role': 'system',
'content': f"[Previous conversation summary]\n{self.memory['summary']}"
})
# Add recent messages
context.extend(self.memory['messages'][-10:])
return context
def _compress(self):
"""Compress older messages."""
if len(self.memory['messages']) < 15:
return
# Messages to summarize
to_summarize = self.memory['messages'][:-10]
# Generate summary
conversation_text = "\n\n".join([
f"{msg['role']}: {msg['content']}"
for msg in to_summarize
])
response = self.client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=500,
messages=[{
'role': 'user',
'content': f"Summarize this conversation:\n\n{conversation_text}"
}]
)
# Update memory
self.memory['summary'] = response.content[0].text
self.memory['messages'] = self.memory['messages'][-10:]
def _save(self):
"""Save session to disk."""
with open(f'sessions/{self.session_id}.json', 'w') as f:
json.dump(self.memory, f, indent=2)
# Usage
client = Anthropic(api_key="your-api-key")
conversation = ResumableConversation(client, session_id="user123_session1")
# Continue across multiple sessions
response1 = conversation.add_turn("What's Python?")
# ... later session
response2 = conversation.add_turn("Show me an example") # Remembers context
Combine multiple techniques for optimal results.
from anthropic import Anthropic
from openai import OpenAI
import chromadb
class HybridMemorySystem:
"""
Combines:
- Rolling summarization (short-term compression)
- RAG retrieval (long-term memory)
- Prompt caching (cost optimization)
- Progressive compression (adaptive behavior)
"""
def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):
self.anthropic = anthropic_client
self.openai = openai_client
# Recent messages (verbatim)
self.recent_messages = []
self.recent_window = 10
# Rolling summary
self.rolling_summary = None
# Vector store (long-term)
self.chroma = chromadb.Client()
self.collection = self.chroma.create_collection(name="memory")
self.message_counter = 0
# Compression thresholds
self.thresholds = {
'light': 0.70, # Start basic compression
'medium': 0.85, # Aggressive summarization
'heavy': 0.95 # Emergency measures
}
def add_message(self, message: dict):
"""Add message with intelligent compression."""
self.recent_messages.append(message)
# Check compression needs
usage_ratio = self._estimate_usage()
if usage_ratio >= self.thresholds['heavy']:
self._emergency_compress()
elif usage_ratio >= self.thresholds['medium']:
self._medium_compress()
elif usage_ratio >= self.thresholds['light']:
self._light_compress()
def _light_compress(self):
"""Remove redundancy, archive to vector store."""
if len(self.recent_messages) > self.recent_window * 1.5:
# Archive oldest to vector store
to_archive = self.recent_messages[:5]
for msg in to_archive:
self._archive_to_vectorstore(msg)
self.recent_messages = self.recent_messages[5:]
def _medium_compress(self):
"""Generate rolling summary, aggressive archival."""
if len(self.recent_messages) > self.recent_window:
# Summarize older messages
to_summarize = self.recent_messages[:-self.recent_window]
summary_text = "\n\n".join([
f"{msg['role']}: {msg['content']}"
for msg in to_summarize
])
if self.rolling_summary:
summary_text = f"Existing: {self.rolling_summary}\n\nNew: {summary_text}"
response = self.anthropic.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=400,
messages=[{
'role': 'user',
'content': f"Update summary:\n{summary_text}"
}]
)
self.rolling_summary = response.content[0].text
# Archive all summarized messages
for msg in to_summarize:
self._archive_to_vectorstore(msg)
self.recent_messages = self.recent_messages[-self.recent_window:]
def _emergency_compress(self):
"""Extreme compression for near-limit situations."""
# Keep only 5 most recent messages
to_archive = self.recent_messages[:-5]
for msg in to_archive:
self._archive_to_vectorstore(msg)
self.recent_messages = self.recent_messages[-5:]
# Compress summary further if needed
if self.rolling_summary and len(self.rolling_summary) > 1000:
response = self.anthropic.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=200,
messages=[{
'role': 'user',
'content': f"Create ultra-concise summary:\n{self.rolling_summary}"
}]
)
self.rolling_summary = response.content[0].text
def _archive_to_vectorstore(self, message: dict):
"""Store in vector database for retrieval."""
embedding_response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=message['content']
)
self.collection.add(
embeddings=[embedding_response.data[0].embedding],
documents=[message['content']],
metadatas=[{'role': message['role']}],
ids=[f"msg_{self.message_counter}"]
)
self.message_counter += 1
def get_context(self, current_query: str, max_tokens: int = 8000):
"""Build optimal context for current query."""
context = []
token_count = 0
# 1. Add rolling summary (if exists)
if self.rolling_summary:
summary_msg = {
'role': 'system',
'content': [
{
'type': 'text',
'text': f"[Conversation Summary]\n{self.rolling_summary}",
'cache_control': {'type': 'ephemeral'} # Cache it
}
]
}
context.append(summary_msg)
token_count += len(self.rolling_summary) // 4
# 2. Retrieve relevant historical context (RAG)
if token_count < max_tokens * 0.3:
query_embedding = self.openai.embeddings.create(
model="text-embedding-3-small",
input=current_query
)
results = self.collection.query(
query_embeddings=[query_embedding.data[0].embedding],
n_results=5
)
for i, doc in enumerate(results['documents'][0]):
if token_count + len(doc) // 4 > max_tokens * 0.3:
break
metadata = results['metadatas'][0][i]
context.append({
'role': metadata['role'],
'content': f"[Retrieved] {doc}"
})
token_count += len(doc) // 4
# 3. Add recent messages verbatim
for msg in self.recent_messages:
if token_count + len(msg['content']) // 4 > max_tokens * 0.8:
break
context.append(msg)
token_count += len(msg['content']) // 4
return context
def _estimate_usage(self):
"""Estimate current context window usage."""
total_tokens = 0
if self.rolling_summary:
total_tokens += len(self.rolling_summary) // 4
for msg in self.recent_messages:
total_tokens += len(msg['content']) // 4
return total_tokens / 200000 # Claude Sonnet context window
# Usage
anthropic_client = Anthropic(api_key="your-anthropic-key")
openai_client = OpenAI(api_key="your-openai-key")
memory = HybridMemorySystem(anthropic_client, openai_client)
# Add messages over time
for i in range(1000):
memory.add_message({
'role': 'user' if i % 2 == 0 else 'assistant',
'content': f"Message {i} with some content..."
})
# Retrieve optimized context
current_query = "What did we discuss about pricing?"
context = memory.get_context(current_query)
# Use with Claude
response = anthropic_client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=context + [{
'role': 'user',
'content': current_query
}]
)
| Technique | Compression Ratio | Quality Loss | Latency | Cost Impact | |-----------|------------------|--------------|---------|-------------| | Extractive | 2-3x | <1% | <10ms | None | | Abstractive | 5-10x | 2-5% | 1-2s | +$0.001/turn | | Hierarchical | 20x+ | 5-8% | 2-5s | +$0.003/turn | | LLMLingua | 20x | 1.5% | 500ms | None | | RAG | Variable | <1% | 100-300ms | +$0.0005/turn | | Prompt Caching | N/A | 0% | 0ms | -90% |
Customer Support (50-turn conversation):
Code Assistant (100-turn session):
Educational Tutor (multi-session):
Example: Claude Sonnet pricing ($3 input, $15 output per 1M tokens)
1,000 conversations, 50 turns each:
No compression:
With rolling summarization:
With hybrid system + caching:
Best for: Hybrid memory systems with minimal code
from mem0 import MemoryClient
client = MemoryClient(api_key="your-mem0-key")
# Automatically handles compression, summarization, RAG
memory = client.create_memory(
user_id="user123",
messages=[
{"role": "user", "content": "I'm working on a Python project"},
{"role": "assistant", "content": "Great! What kind of project?"}
]
)
# Retrieve relevant context
context = client.get_memory(
user_id="user123",
query="What programming language am I using?"
)
Features:
Pricing: $0.40/1K memory operations
Best for: Low-latency production deployments**
from zep_python import ZepClient
client = ZepClient(api_key="your-zep-key")
# Add to session
client.memory.add_memory(
session_id="session123",
messages=[
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"}
]
)
# Auto-summarized retrieval
memory = client.memory.get_memory(session_id="session123")
Features:
Pricing: Open-source (self-hosted) or $0.50/1K operations (cloud)
Best for: Self-hosted vector storage**
import chromadb
client = chromadb.Client()
collection = client.create_collection("conversations")
# Store embeddings
collection.add(
documents=["Message content"],
embeddings=[[0.1, 0.2, ...]],
ids=["msg1"]
)
# Retrieve
results = collection.query(
query_embeddings=[[0.1, 0.2, ...]],
n_results=5
)
Features:
Pricing: Free (self-hosted)
Best for: Rapid prototyping and experimentation**
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
memory = ConversationSummaryBufferMemory(llm=llm, max_token_limit=2000)
Features:
Pricing: Free (uses your LLM API costs)
Best for: Extreme compression with minimal quality loss**
from llmlingua import PromptCompressor
compressor = PromptCompressor()
compressed = compressor.compress_prompt(
context="Long conversation history...",
instruction="Current user query",
target_token=500
)
# Achieves 20x compression with 1.5% accuracy loss
Features:
Pricing: Free (open-source)
Requirements:
Recommended approach:
Implementation:
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-haiku-20241022")
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=2000,
return_messages=True
)
# Add customer conversation
for turn in customer_conversation:
memory.save_context(
{"input": turn['customer_message']},
{"output": turn['agent_response']}
)
# Retrieve compressed context
context = memory.load_memory_variables({})
Requirements:
Recommended approach:
Implementation:
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
class CodeAssistantMemory:
def __init__(self):
self.hierarchy = HierarchicalMemory(client, chunk_size=15)
self.rag = RAGMemory(anthropic_client=client, openai_client=openai_client)
self.deltas = DeltaCompressor()
def add_interaction(self, code_change: dict):
# Store in hierarchy
self.hierarchy.add_message({
'role': 'user',
'content': code_change['description']
})
# Store in RAG for retrieval
self.rag.add_message(code_change)
# Store as delta if incremental
if code_change.get('is_incremental'):
self.deltas.add_message(code_change)
def get_context(self, current_query: str):
# Combine hierarchical summary + RAG retrieval
summary_context = self.hierarchy.get_context(max_tokens=2000)
rag_context = self.rag.retrieve_context(current_query, max_tokens=2000)
return summary_context + rag_context
Requirements:
Recommended approach:
Implementation:
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
class TutorMemory:
def __init__(self, student_id: str):
self.student_id = student_id
# Vector store for all sessions
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
collection_name=f"student_{student_id}",
embedding_function=embeddings
)
self.memory = VectorStoreRetrieverMemory(
retriever=vectorstore.as_retriever(search_kwargs={"k": 10})
)
def add_lesson_content(self, lesson: dict):
"""Add lesson interaction to student memory."""
self.memory.save_context(
{"input": lesson['topic']},
{"output": lesson['explanation']}
)
def get_student_context(self, current_topic: str):
"""Retrieve relevant past lessons for current topic."""
return self.memory.load_memory_variables({
"prompt": current_topic
})
Don't compress aggressively from the start. Use thresholds:
Single-technique approaches are suboptimal. Best production systems use:
Track compression impact:
Cache stable content:
Don't cache frequently changing content:
Save compression state for:
Test and optimize:
Plan for:
Solutions:
Solutions:
Solutions:
Solutions:
Solutions:
Solutions:
Compress in real-time as conversation progresses:
async def streaming_compress(messages: list):
"""Compress while streaming responses."""
compressor = ProgressiveCompressor()
async for message in conversation_stream:
compressor.add_message(message)
# Compression happens asynchronously
if compressor.should_compress():
asyncio.create_task(compressor.compress_async())
return compressor.get_context()
Handle concurrent conversations with shared context:
class MultiUserMemory:
def __init__(self):
self.user_sessions = {}
def get_or_create_session(self, user_id: str):
if user_id not in self.user_sessions:
self.user_sessions[user_id] = HybridMemorySystem(...)
return self.user_sessions[user_id]
def cleanup_inactive_sessions(self, timeout: int = 3600):
"""Remove sessions inactive for > timeout seconds."""
current_time = time.time()
inactive = [
user_id for user_id, session in self.user_sessions.items()
if current_time - session.last_activity > timeout
]
for user_id in inactive:
self._archive_session(user_id)
del self.user_sessions[user_id]
Train ML models to score message importance:
from transformers import pipeline
class MLImportanceScorer:
def __init__(self):
# Use pre-trained classifier or fine-tune on your data
self.classifier = pipeline(
"text-classification",
model="your-importance-model"
)
def score(self, message: dict) -> float:
"""Score message importance (0-1)."""
result = self.classifier(message['content'])
return result[0]['score']
Maximize information density within token budget:
def optimize_context_allocation(
summary_tokens: int,
recent_tokens: int,
retrieval_tokens: int,
max_tokens: int
):
"""
Optimal allocation (empirically tested):
- 20% summary
- 50% recent messages
- 30% retrieved context
"""
return {
'summary': int(max_tokens * 0.20),
'recent': int(max_tokens * 0.50),
'retrieval': int(max_tokens * 0.30)
}
1. Infinite Attention Mechanisms
2. Learned Compression Models
3. Multimodal Session Compression
4. Federated Memory Systems
5. Adaptive Compression Strategies
Last Updated: 2025-11-30 Version: 1.0.0 License: MIT
development
Optimize web performance using Core Web Vitals, modern patterns (View Transitions, Speculation Rules), and framework-specific techniques
development
Best practices for documenting APIs and code interfaces, eliminating redundant documentation guidance per agent.
development
Comprehensive API design patterns covering REST, GraphQL, gRPC, versioning, authentication, and modern API best practices
development
Visual verification workflow for UI changes to accelerate code review and catch ...