.claude/skills/building-rag-systems/SKILL.md
Build production RAG systems with semantic chunking, incremental indexing, and filtered retrieval. Use when implementing document ingestion pipelines, vector search with Qdrant, or context-aware retrieval. Covers chunking strategies, change detection, payload indexing, and context expansion. NOT when doing simple similarity search without production requirements.
npx skillsauth add Asmayaseen/hackathon-2 building-rag-systemsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Production-grade RAG with semantic chunking, incremental updates, and filtered retrieval.
# Dependencies
pip install qdrant-client openai pydantic python-frontmatter
# Core components
# 1. Crawler → discovers files, extracts path metadata
# 2. Parser → extracts frontmatter, computes file hash
# 3. Chunker → semantic split on ## headers, 400 tokens, 15% overlap
# 4. Embedder → batched OpenAI embeddings
# 5. Uploader → Qdrant upsert with indexed payloads
┌──────────┐ ┌────────┐ ┌─────────┐ ┌──────────┐ ┌──────────┐
│ Crawler │ -> │ Parser │ -> │ Chunker │ -> │ Embedder │ -> │ Uploader │
└──────────┘ └────────┘ └─────────┘ └──────────┘ └──────────┘
│ │ │ │ │
Discovers Extracts Splits by Generates Upserts to
files frontmatter semantic vectors Qdrant
+ file hash boundaries (batched) (batched)
class SemanticChunker:
"""
Production chunking:
- Split on ## headers (semantic boundaries)
- Target 400 tokens (NVIDIA benchmark optimal)
- 15% overlap for context continuity
- Track prev/next for context expansion
"""
SECTION_PATTERN = re.compile(r"(?=^## )", re.MULTILINE)
TOKENS_PER_WORD = 1.3
def __init__(
self,
target_tokens: int = 400,
max_tokens: int = 512,
overlap_percent: float = 0.15,
):
self.target_words = int(target_tokens / self.TOKENS_PER_WORD)
self.overlap_words = int(self.target_words * overlap_percent)
def chunk(self, content: str, file_hash: str) -> list[Chunk]:
sections = self.SECTION_PATTERN.split(content)
chunks = []
for idx, section in enumerate(sections):
content_hash = hashlib.sha256(section.encode()).hexdigest()[:16]
chunk_id = f"{file_hash[:8]}_{content_hash}_{idx}"
chunks.append(Chunk(
id=chunk_id,
text=section,
chunk_index=idx,
total_chunks=len(sections),
prev_chunk_id=chunks[-1].id if chunks else None,
content_hash=content_hash,
source_file_hash=file_hash,
))
# Set next_chunk_id on previous
if len(chunks) > 1:
chunks[-2].next_chunk_id = chunk_id
return chunks
def compute_file_hash(file_path: str) -> str:
"""SHA-256 for change detection."""
with open(file_path, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
class QdrantStateTracker:
"""Query Qdrant payloads directly - no external state DB needed."""
def get_indexed_files(self, book_id: str) -> dict[str, str]:
"""Returns {file_path: file_hash} from Qdrant."""
indexed = {}
offset = None
while True:
points, next_offset = self.client.scroll(
collection_name=self.collection,
scroll_filter=Filter(must=[
FieldCondition(key="book_id", match=MatchValue(value=book_id))
]),
limit=100,
offset=offset,
with_payload=["source_file", "source_file_hash"],
with_vectors=False,
)
for point in points:
indexed[point.payload["source_file"]] = point.payload["source_file_hash"]
if next_offset is None:
break
offset = next_offset
return indexed
def detect_changes(self, current: dict[str, str], indexed: dict[str, str]):
"""Compare filesystem vs index."""
new = [p for p in current if p not in indexed]
deleted = [p for p in indexed if p not in current]
modified = [p for p in current if p in indexed and current[p] != indexed[p]]
return new, modified, deleted
class OpenAIEmbedder:
def __init__(self, model: str = "text-embedding-3-small", batch_size: int = 20):
self.client = OpenAI()
self.model = model
self.batch_size = batch_size # OpenAI recommendation
def embed_chunks(self, chunks: list[Chunk]) -> list[EmbeddedChunk]:
embedded = []
for i in range(0, len(chunks), self.batch_size):
batch = chunks[i:i + self.batch_size]
response = self.client.embeddings.create(
input=[c.text for c in batch],
model=self.model,
)
for chunk, data in zip(batch, response.data):
embedded.append(EmbeddedChunk(**chunk.dict(), embedding=data.embedding))
return embedded
def create_collection(self, recreate: bool = False):
"""Create collection with proper indexes for filtered retrieval."""
self.client.create_collection(
collection_name=self.collection,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
)
# Index ALL fields you filter by
indexes = [
("book_id", PayloadSchemaType.KEYWORD), # Tenant isolation
("module", PayloadSchemaType.KEYWORD), # Content filter
("chapter", PayloadSchemaType.INTEGER), # Range filter
("hardware_tier", PayloadSchemaType.INTEGER),# Personalization
("proficiency_level", PayloadSchemaType.KEYWORD),
("parent_doc_id", PayloadSchemaType.KEYWORD),# Context expansion
("source_file_hash", PayloadSchemaType.KEYWORD), # Change detection
]
for field, schema in indexes:
self.client.create_payload_index(
collection_name=self.collection,
field_name=field,
field_schema=schema,
)
def build_filter(self, query: SearchQuery) -> Filter:
"""Build Qdrant filter with all conditions (AND logic)."""
conditions = []
# Required: Tenant isolation
conditions.append(FieldCondition(
key="book_id", match=MatchValue(value=query.book_id)
))
# Required: Hardware tier (lte = "tier X or lower")
conditions.append(FieldCondition(
key="hardware_tier", range=Range(lte=query.hardware_tier)
))
# Optional: Module exact match
if query.module:
conditions.append(FieldCondition(
key="module", match=MatchValue(value=query.module)
))
# Optional: Chapter range
if query.chapter_min or query.chapter_max:
chapter_range = Range()
if query.chapter_min:
chapter_range.gte = query.chapter_min
if query.chapter_max:
chapter_range.lte = query.chapter_max
conditions.append(FieldCondition(key="chapter", range=chapter_range))
# Optional: Proficiency OR logic
if query.proficiency_levels:
conditions.append(FieldCondition(
key="proficiency_level",
match=MatchAny(any=query.proficiency_levels),
))
return Filter(must=conditions)
def expand_context(self, chunk_id: str, prev: int = 1, next: int = 1) -> list[Chunk]:
"""Walk prev_chunk_id/next_chunk_id chain for surrounding context."""
current = self.get_chunk_by_id(chunk_id)
if not current:
return []
# Walk backwards
prev_chunks = []
prev_id = current.prev_chunk_id
for _ in range(prev):
if not prev_id:
break
chunk = self.get_chunk_by_id(prev_id)
if not chunk:
break
prev_chunks.insert(0, chunk)
prev_id = chunk.prev_chunk_id
# Walk forwards
next_chunks = []
next_id = current.next_chunk_id
for _ in range(next):
if not next_id:
break
chunk = self.get_chunk_by_id(next_id)
if not chunk:
break
next_chunks.append(chunk)
next_id = chunk.next_chunk_id
return prev_chunks + [current] + next_chunks
def get_document_chunks(self, parent_doc_id: str) -> list[Chunk]:
"""Get all chunks for a document, ordered by chunk_index."""
points, _ = self.client.scroll(
collection_name=self.collection,
scroll_filter=Filter(must=[
FieldCondition(key="parent_doc_id", match=MatchValue(value=parent_doc_id))
]),
limit=100,
with_payload=True,
with_vectors=False,
)
chunks = [self._to_chunk(p) for p in points]
chunks.sort(key=lambda c: c.chunk_index)
return chunks
class ChunkPayload(BaseModel):
"""Complete payload for filtered retrieval and context expansion."""
# Tenant isolation
book_id: str
# Content filters (all indexed)
module: str
chapter: int
lesson: int
hardware_tier: int
proficiency_level: str
# Display content
text: str
section_title: Optional[str]
source_file: str
# Context expansion
parent_doc_id: str
chunk_index: int
total_chunks: int
prev_chunk_id: Optional[str]
next_chunk_id: Optional[str]
# Change detection
content_hash: str
source_file_hash: str
| Don't | Do Instead | |-------|------------| | Fixed character chunking | Semantic boundaries (## headers) | | Position-based chunk IDs | Content hash for stable IDs | | No overlap between chunks | 10-20% overlap for continuity | | Full re-index on every change | Incremental with file hash detection | | Missing payload indexes | Index every field you filter by | | Synchronous embedding | Batch with background jobs | | External state database | Qdrant-native state tracking |
Run: python scripts/verify.py
scaffolding-fastapi-dapr - API patterns for search endpointsstreaming-llm-responses - Streaming RAG responsesdevelopment
Systematic methodology for debugging bugs, test failures, and unexpected behavior. Use when encountering any technical issue before proposing fixes. Covers root cause investigation, pattern analysis, hypothesis testing, and fix implementation. Use ESPECIALLY when under time pressure, "just one quick fix" seems obvious, or you've already tried multiple fixes. NOT for exploratory code reading.
development
Build beautiful, accessible UIs with shadcn/ui components in Next.js. Use when creating forms, dialogs, tables, sidebars, or any UI components. Covers installation, component patterns, react-hook-form + Zod validation, and dark mode setup. NOT when building non-React applications or using different component libraries.
tools
Implement real-time streaming UI patterns for AI chat applications. Use when adding response lifecycle handlers, progress indicators, client effects, or thread state synchronization. Covers onResponseStart/End, onEffect, ProgressUpdateEvent, and client tools. NOT when building basic chat without real-time feedback.
tools
Builds AI agents using OpenAI Agents SDK with async/await patterns and multi-agent orchestration. Use when creating tutoring agents, building agent handoffs, implementing tool-calling agents, or orchestrating multiple specialists. Covers Agent class, Runner patterns, function tools, guardrails, and streaming responses. NOT when using raw OpenAI API without SDK or other agent frameworks like LangChain.