skills/asmayaseen/building-rag-systems/SKILL.md
Build production RAG systems with semantic chunking, incremental indexing, and filtered retrieval. Use when implementing document ingestion pipelines, vector search with Qdrant, or context-aware retrieval. Covers chunking strategies, change detection, payload indexing, and context expansion. NOT when doing simple similarity search without production requirements.
npx skillsauth add aiskillstore/marketplace building-rag-systemsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Production-grade RAG with semantic chunking, incremental updates, and filtered retrieval.
# Dependencies
pip install qdrant-client openai pydantic python-frontmatter
# Core components
# 1. Crawler → discovers files, extracts path metadata
# 2. Parser → extracts frontmatter, computes file hash
# 3. Chunker → semantic split on ## headers, 400 tokens, 15% overlap
# 4. Embedder → batched OpenAI embeddings
# 5. Uploader → Qdrant upsert with indexed payloads
┌──────────┐ ┌────────┐ ┌─────────┐ ┌──────────┐ ┌──────────┐
│ Crawler │ -> │ Parser │ -> │ Chunker │ -> │ Embedder │ -> │ Uploader │
└──────────┘ └────────┘ └─────────┘ └──────────┘ └──────────┘
│ │ │ │ │
Discovers Extracts Splits by Generates Upserts to
files frontmatter semantic vectors Qdrant
+ file hash boundaries (batched) (batched)
class SemanticChunker:
"""
Production chunking:
- Split on ## headers (semantic boundaries)
- Target 400 tokens (NVIDIA benchmark optimal)
- 15% overlap for context continuity
- Track prev/next for context expansion
"""
SECTION_PATTERN = re.compile(r"(?=^## )", re.MULTILINE)
TOKENS_PER_WORD = 1.3
def __init__(
self,
target_tokens: int = 400,
max_tokens: int = 512,
overlap_percent: float = 0.15,
):
self.target_words = int(target_tokens / self.TOKENS_PER_WORD)
self.overlap_words = int(self.target_words * overlap_percent)
def chunk(self, content: str, file_hash: str) -> list[Chunk]:
sections = self.SECTION_PATTERN.split(content)
chunks = []
for idx, section in enumerate(sections):
content_hash = hashlib.sha256(section.encode()).hexdigest()[:16]
chunk_id = f"{file_hash[:8]}_{content_hash}_{idx}"
chunks.append(Chunk(
id=chunk_id,
text=section,
chunk_index=idx,
total_chunks=len(sections),
prev_chunk_id=chunks[-1].id if chunks else None,
content_hash=content_hash,
source_file_hash=file_hash,
))
# Set next_chunk_id on previous
if len(chunks) > 1:
chunks[-2].next_chunk_id = chunk_id
return chunks
def compute_file_hash(file_path: str) -> str:
"""SHA-256 for change detection."""
with open(file_path, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
class QdrantStateTracker:
"""Query Qdrant payloads directly - no external state DB needed."""
def get_indexed_files(self, book_id: str) -> dict[str, str]:
"""Returns {file_path: file_hash} from Qdrant."""
indexed = {}
offset = None
while True:
points, next_offset = self.client.scroll(
collection_name=self.collection,
scroll_filter=Filter(must=[
FieldCondition(key="book_id", match=MatchValue(value=book_id))
]),
limit=100,
offset=offset,
with_payload=["source_file", "source_file_hash"],
with_vectors=False,
)
for point in points:
indexed[point.payload["source_file"]] = point.payload["source_file_hash"]
if next_offset is None:
break
offset = next_offset
return indexed
def detect_changes(self, current: dict[str, str], indexed: dict[str, str]):
"""Compare filesystem vs index."""
new = [p for p in current if p not in indexed]
deleted = [p for p in indexed if p not in current]
modified = [p for p in current if p in indexed and current[p] != indexed[p]]
return new, modified, deleted
class OpenAIEmbedder:
def __init__(self, model: str = "text-embedding-3-small", batch_size: int = 20):
self.client = OpenAI()
self.model = model
self.batch_size = batch_size # OpenAI recommendation
def embed_chunks(self, chunks: list[Chunk]) -> list[EmbeddedChunk]:
embedded = []
for i in range(0, len(chunks), self.batch_size):
batch = chunks[i:i + self.batch_size]
response = self.client.embeddings.create(
input=[c.text for c in batch],
model=self.model,
)
for chunk, data in zip(batch, response.data):
embedded.append(EmbeddedChunk(**chunk.dict(), embedding=data.embedding))
return embedded
def create_collection(self, recreate: bool = False):
"""Create collection with proper indexes for filtered retrieval."""
self.client.create_collection(
collection_name=self.collection,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
)
# Index ALL fields you filter by
indexes = [
("book_id", PayloadSchemaType.KEYWORD), # Tenant isolation
("module", PayloadSchemaType.KEYWORD), # Content filter
("chapter", PayloadSchemaType.INTEGER), # Range filter
("hardware_tier", PayloadSchemaType.INTEGER),# Personalization
("proficiency_level", PayloadSchemaType.KEYWORD),
("parent_doc_id", PayloadSchemaType.KEYWORD),# Context expansion
("source_file_hash", PayloadSchemaType.KEYWORD), # Change detection
]
for field, schema in indexes:
self.client.create_payload_index(
collection_name=self.collection,
field_name=field,
field_schema=schema,
)
def build_filter(self, query: SearchQuery) -> Filter:
"""Build Qdrant filter with all conditions (AND logic)."""
conditions = []
# Required: Tenant isolation
conditions.append(FieldCondition(
key="book_id", match=MatchValue(value=query.book_id)
))
# Required: Hardware tier (lte = "tier X or lower")
conditions.append(FieldCondition(
key="hardware_tier", range=Range(lte=query.hardware_tier)
))
# Optional: Module exact match
if query.module:
conditions.append(FieldCondition(
key="module", match=MatchValue(value=query.module)
))
# Optional: Chapter range
if query.chapter_min or query.chapter_max:
chapter_range = Range()
if query.chapter_min:
chapter_range.gte = query.chapter_min
if query.chapter_max:
chapter_range.lte = query.chapter_max
conditions.append(FieldCondition(key="chapter", range=chapter_range))
# Optional: Proficiency OR logic
if query.proficiency_levels:
conditions.append(FieldCondition(
key="proficiency_level",
match=MatchAny(any=query.proficiency_levels),
))
return Filter(must=conditions)
def expand_context(self, chunk_id: str, prev: int = 1, next: int = 1) -> list[Chunk]:
"""Walk prev_chunk_id/next_chunk_id chain for surrounding context."""
current = self.get_chunk_by_id(chunk_id)
if not current:
return []
# Walk backwards
prev_chunks = []
prev_id = current.prev_chunk_id
for _ in range(prev):
if not prev_id:
break
chunk = self.get_chunk_by_id(prev_id)
if not chunk:
break
prev_chunks.insert(0, chunk)
prev_id = chunk.prev_chunk_id
# Walk forwards
next_chunks = []
next_id = current.next_chunk_id
for _ in range(next):
if not next_id:
break
chunk = self.get_chunk_by_id(next_id)
if not chunk:
break
next_chunks.append(chunk)
next_id = chunk.next_chunk_id
return prev_chunks + [current] + next_chunks
def get_document_chunks(self, parent_doc_id: str) -> list[Chunk]:
"""Get all chunks for a document, ordered by chunk_index."""
points, _ = self.client.scroll(
collection_name=self.collection,
scroll_filter=Filter(must=[
FieldCondition(key="parent_doc_id", match=MatchValue(value=parent_doc_id))
]),
limit=100,
with_payload=True,
with_vectors=False,
)
chunks = [self._to_chunk(p) for p in points]
chunks.sort(key=lambda c: c.chunk_index)
return chunks
class ChunkPayload(BaseModel):
"""Complete payload for filtered retrieval and context expansion."""
# Tenant isolation
book_id: str
# Content filters (all indexed)
module: str
chapter: int
lesson: int
hardware_tier: int
proficiency_level: str
# Display content
text: str
section_title: Optional[str]
source_file: str
# Context expansion
parent_doc_id: str
chunk_index: int
total_chunks: int
prev_chunk_id: Optional[str]
next_chunk_id: Optional[str]
# Change detection
content_hash: str
source_file_hash: str
| Don't | Do Instead | |-------|------------| | Fixed character chunking | Semantic boundaries (## headers) | | Position-based chunk IDs | Content hash for stable IDs | | No overlap between chunks | 10-20% overlap for continuity | | Full re-index on every change | Incremental with file hash detection | | Missing payload indexes | Index every field you filter by | | Synchronous embedding | Batch with background jobs | | External state database | Qdrant-native state tracking |
Run: python scripts/verify.py
scaffolding-fastapi-dapr - API patterns for search endpointsstreaming-llm-responses - Streaming RAG responsesdevelopment
Apple Human Interface Guidelines for content display components. Use this skill when the user asks about charts component, collection view, image view, web view, color well, image well, activity view, lockup, data visualization, content display, displaying images, rendering web content, color pickers, or presenting collections of items in Apple apps. Also use when the user says how should I display charts, what's the best way to show images, should I use a web view, how do I build a grid of items, what component shows media, or how do I present a share sheet. Cross-references: hig-foundations for color/typography/accessibility, hig-patterns for data visualization patterns, hig-components-layout for structural containers, hig-platforms for platform-specific component behavior.
tools
Automate HelpDesk tasks via Rube MCP (Composio): list tickets, manage views, use canned responses, and configure custom fields. Always search tools first for current schemas.
testing
Expert Haskell engineer specializing in advanced type systems, pure functional design, and high-reliability software. Use PROACTIVELY for type-level programming, concurrency, and architecture guidance.
tools
GraphQL gives clients exactly the data they need - no more, no less. One endpoint, typed schema, introspection. But the flexibility that makes it powerful also makes it dangerous. Without proper controls, clients can craft queries that bring down your server. This skill covers schema design, resolvers, DataLoader for N+1 prevention, federation for microservices, and client integration with Apollo/urql. Key insight: GraphQL is a contract. The schema is the API documentation. Design it carefully.