skills/hybrid-search-implementation/SKILL.md
Combine vector and keyword search for improved retrieval. Use when implementing RAG systems, building search engines, or when neither approach alone provides sufficient recall.
npx skillsauth add jyjeanne/ai-setup-forge hybrid-search-implementationInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Patterns for combining vector similarity and keyword-based search.
Query → ┬─► Vector Search ──► Candidates ─┐
│ │
└─► Keyword Search ─► Candidates ─┴─► Fusion ─► Results
| Method | Description | Best For | | ----------------- | ------------------------ | --------------- | | RRF | Reciprocal Rank Fusion | General purpose | | Linear | Weighted sum of scores | Tunable balance | | Cross-encoder | Rerank with neural model | Highest quality | | Cascade | Filter then rerank | Efficiency |
from typing import List, Dict, Tuple
from collections import defaultdict
def reciprocal_rank_fusion(
result_lists: List[List[Tuple[str, float]]],
k: int = 60,
weights: List[float] = None
) -> List[Tuple[str, float]]:
"""
Combine multiple ranked lists using RRF.
Args:
result_lists: List of (doc_id, score) tuples per search method
k: RRF constant (higher = more weight to lower ranks)
weights: Optional weights per result list
Returns:
Fused ranking as (doc_id, score) tuples
"""
if weights is None:
weights = [1.0] * len(result_lists)
scores = defaultdict(float)
for result_list, weight in zip(result_lists, weights):
for rank, (doc_id, _) in enumerate(result_list):
# RRF formula: 1 / (k + rank)
scores[doc_id] += weight * (1.0 / (k + rank + 1))
# Sort by fused score
return sorted(scores.items(), key=lambda x: x[1], reverse=True)
def linear_combination(
vector_results: List[Tuple[str, float]],
keyword_results: List[Tuple[str, float]],
alpha: float = 0.5
) -> List[Tuple[str, float]]:
"""
Combine results with linear interpolation.
Args:
vector_results: (doc_id, similarity_score) from vector search
keyword_results: (doc_id, bm25_score) from keyword search
alpha: Weight for vector search (1-alpha for keyword)
"""
# Normalize scores to [0, 1]
def normalize(results):
if not results:
return {}
scores = [s for _, s in results]
min_s, max_s = min(scores), max(scores)
range_s = max_s - min_s if max_s != min_s else 1
return {doc_id: (score - min_s) / range_s for doc_id, score in results}
vector_scores = normalize(vector_results)
keyword_scores = normalize(keyword_results)
# Combine
all_docs = set(vector_scores.keys()) | set(keyword_scores.keys())
combined = {}
for doc_id in all_docs:
v_score = vector_scores.get(doc_id, 0)
k_score = keyword_scores.get(doc_id, 0)
combined[doc_id] = alpha * v_score + (1 - alpha) * k_score
return sorted(combined.items(), key=lambda x: x[1], reverse=True)
import asyncpg
from typing import List, Dict, Optional
import numpy as np
class PostgresHybridSearch:
"""Hybrid search with pgvector and full-text search."""
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def setup_schema(self):
"""Create tables and indexes."""
async with self.pool.acquire() as conn:
await conn.execute("""
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
embedding vector(1536),
metadata JSONB DEFAULT '{}',
ts_content tsvector GENERATED ALWAYS AS (
to_tsvector('english', content)
) STORED
);
-- Vector index (HNSW)
CREATE INDEX IF NOT EXISTS documents_embedding_idx
ON documents USING hnsw (embedding vector_cosine_ops);
-- Full-text index (GIN)
CREATE INDEX IF NOT EXISTS documents_fts_idx
ON documents USING gin (ts_content);
""")
async def hybrid_search(
self,
query: str,
query_embedding: List[float],
limit: int = 10,
vector_weight: float = 0.5,
filter_metadata: Optional[Dict] = None
) -> List[Dict]:
"""
Perform hybrid search combining vector and full-text.
Uses RRF fusion for combining results.
"""
async with self.pool.acquire() as conn:
# Build filter clause
where_clause = "1=1"
params = [query_embedding, query, limit * 3]
if filter_metadata:
for key, value in filter_metadata.items():
params.append(value)
where_clause += f" AND metadata->>'{key}' = ${len(params)}"
results = await conn.fetch(f"""
WITH vector_search AS (
SELECT
id,
content,
metadata,
ROW_NUMBER() OVER (ORDER BY embedding <=> $1::vector) as vector_rank,
1 - (embedding <=> $1::vector) as vector_score
FROM documents
WHERE {where_clause}
ORDER BY embedding <=> $1::vector
LIMIT $3
),
keyword_search AS (
SELECT
id,
content,
metadata,
ROW_NUMBER() OVER (ORDER BY ts_rank(ts_content, websearch_to_tsquery('english', $2)) DESC) as keyword_rank,
ts_rank(ts_content, websearch_to_tsquery('english', $2)) as keyword_score
FROM documents
WHERE ts_content @@ websearch_to_tsquery('english', $2)
AND {where_clause}
ORDER BY ts_rank(ts_content, websearch_to_tsquery('english', $2)) DESC
LIMIT $3
)
SELECT
COALESCE(v.id, k.id) as id,
COALESCE(v.content, k.content) as content,
COALESCE(v.metadata, k.metadata) as metadata,
v.vector_score,
k.keyword_score,
-- RRF fusion
COALESCE(1.0 / (60 + v.vector_rank), 0) * $4::float +
COALESCE(1.0 / (60 + k.keyword_rank), 0) * (1 - $4::float) as rrf_score
FROM vector_search v
FULL OUTER JOIN keyword_search k ON v.id = k.id
ORDER BY rrf_score DESC
LIMIT $3 / 3
""", *params, vector_weight)
return [dict(row) for row in results]
async def search_with_rerank(
self,
query: str,
query_embedding: List[float],
limit: int = 10,
rerank_candidates: int = 50
) -> List[Dict]:
"""Hybrid search with cross-encoder reranking."""
from sentence_transformers import CrossEncoder
# Get candidates
candidates = await self.hybrid_search(
query, query_embedding, limit=rerank_candidates
)
if not candidates:
return []
# Rerank with cross-encoder
model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
pairs = [(query, c["content"]) for c in candidates]
scores = model.predict(pairs)
for candidate, score in zip(candidates, scores):
candidate["rerank_score"] = float(score)
# Sort by rerank score and return top results
reranked = sorted(candidates, key=lambda x: x["rerank_score"], reverse=True)
return reranked[:limit]
from elasticsearch import Elasticsearch
from typing import List, Dict, Optional
class ElasticsearchHybridSearch:
"""Hybrid search with Elasticsearch and dense vectors."""
def __init__(
self,
es_client: Elasticsearch,
index_name: str = "documents"
):
self.es = es_client
self.index_name = index_name
def create_index(self, vector_dims: int = 1536):
"""Create index with dense vector and text fields."""
mapping = {
"mappings": {
"properties": {
"content": {
"type": "text",
"analyzer": "english"
},
"embedding": {
"type": "dense_vector",
"dims": vector_dims,
"index": True,
"similarity": "cosine"
},
"metadata": {
"type": "object",
"enabled": True
}
}
}
}
self.es.indices.create(index=self.index_name, body=mapping, ignore=400)
def hybrid_search(
self,
query: str,
query_embedding: List[float],
limit: int = 10,
boost_vector: float = 1.0,
boost_text: float = 1.0,
filter: Optional[Dict] = None
) -> List[Dict]:
"""
Hybrid search using Elasticsearch's built-in capabilities.
"""
# Build the hybrid query
search_body = {
"size": limit,
"query": {
"bool": {
"should": [
# Vector search (kNN)
{
"script_score": {
"query": {"match_all": {}},
"script": {
"source": f"cosineSimilarity(params.query_vector, 'embedding') * {boost_vector} + 1.0",
"params": {"query_vector": query_embedding}
}
}
},
# Text search (BM25)
{
"match": {
"content": {
"query": query,
"boost": boost_text
}
}
}
],
"minimum_should_match": 1
}
}
}
# Add filter if provided
if filter:
search_body["query"]["bool"]["filter"] = filter
response = self.es.search(index=self.index_name, body=search_body)
return [
{
"id": hit["_id"],
"content": hit["_source"]["content"],
"metadata": hit["_source"].get("metadata", {}),
"score": hit["_score"]
}
for hit in response["hits"]["hits"]
]
def hybrid_search_rrf(
self,
query: str,
query_embedding: List[float],
limit: int = 10,
window_size: int = 100
) -> List[Dict]:
"""
Hybrid search using Elasticsearch 8.x RRF.
"""
search_body = {
"size": limit,
"sub_searches": [
{
"query": {
"match": {
"content": query
}
}
},
{
"query": {
"knn": {
"field": "embedding",
"query_vector": query_embedding,
"k": window_size,
"num_candidates": window_size * 2
}
}
}
],
"rank": {
"rrf": {
"window_size": window_size,
"rank_constant": 60
}
}
}
response = self.es.search(index=self.index_name, body=search_body)
return [
{
"id": hit["_id"],
"content": hit["_source"]["content"],
"score": hit["_score"]
}
for hit in response["hits"]["hits"]
]
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass
@dataclass
class SearchResult:
id: str
content: str
score: float
source: str # "vector", "keyword", "hybrid"
metadata: Dict = None
class HybridRAGPipeline:
"""Complete hybrid search pipeline for RAG."""
def __init__(
self,
vector_store,
keyword_store,
embedder,
reranker=None,
fusion_method: str = "rrf",
vector_weight: float = 0.5
):
self.vector_store = vector_store
self.keyword_store = keyword_store
self.embedder = embedder
self.reranker = reranker
self.fusion_method = fusion_method
self.vector_weight = vector_weight
async def search(
self,
query: str,
top_k: int = 10,
filter: Optional[Dict] = None,
use_rerank: bool = True
) -> List[SearchResult]:
"""Execute hybrid search pipeline."""
# Step 1: Get query embedding
query_embedding = self.embedder.embed(query)
# Step 2: Execute parallel searches
vector_results, keyword_results = await asyncio.gather(
self._vector_search(query_embedding, top_k * 3, filter),
self._keyword_search(query, top_k * 3, filter)
)
# Step 3: Fuse results
if self.fusion_method == "rrf":
fused = self._rrf_fusion(vector_results, keyword_results)
else:
fused = self._linear_fusion(vector_results, keyword_results)
# Step 4: Rerank if enabled
if use_rerank and self.reranker:
fused = await self._rerank(query, fused[:top_k * 2])
return fused[:top_k]
async def _vector_search(
self,
embedding: List[float],
limit: int,
filter: Dict
) -> List[SearchResult]:
results = await self.vector_store.search(embedding, limit, filter)
return [
SearchResult(
id=r["id"],
content=r["content"],
score=r["score"],
source="vector",
metadata=r.get("metadata")
)
for r in results
]
async def _keyword_search(
self,
query: str,
limit: int,
filter: Dict
) -> List[SearchResult]:
results = await self.keyword_store.search(query, limit, filter)
return [
SearchResult(
id=r["id"],
content=r["content"],
score=r["score"],
source="keyword",
metadata=r.get("metadata")
)
for r in results
]
def _rrf_fusion(
self,
vector_results: List[SearchResult],
keyword_results: List[SearchResult]
) -> List[SearchResult]:
"""Fuse with RRF."""
k = 60
scores = {}
content_map = {}
for rank, result in enumerate(vector_results):
scores[result.id] = scores.get(result.id, 0) + 1 / (k + rank + 1)
content_map[result.id] = result
for rank, result in enumerate(keyword_results):
scores[result.id] = scores.get(result.id, 0) + 1 / (k + rank + 1)
if result.id not in content_map:
content_map[result.id] = result
sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
return [
SearchResult(
id=doc_id,
content=content_map[doc_id].content,
score=scores[doc_id],
source="hybrid",
metadata=content_map[doc_id].metadata
)
for doc_id in sorted_ids
]
async def _rerank(
self,
query: str,
results: List[SearchResult]
) -> List[SearchResult]:
"""Rerank with cross-encoder."""
if not results:
return results
pairs = [(query, r.content) for r in results]
scores = self.reranker.predict(pairs)
for result, score in zip(results, scores):
result.score = float(score)
return sorted(results, key=lambda x: x.score, reverse=True)
development
Generate breadboard circuit mockups and visual diagrams using HTML5 Canvas drawing techniques. Use when asked to create circuit layouts, visualize electronic component placements, draw breadboard diagrams, mockup 6502 builds, generate retro computer schematics, or design vintage electronics projects. Supports 555 timers, W65C02S microprocessors, 28C256 EEPROMs, W65C22 VIA chips, 7400-series logic gates, LEDs, resistors, capacitors, switches, buttons, crystals, and wires.
development
Apply lean thinking to UX: hypothesis-driven design, collaborative sketching, and rapid experiments instead of heavy deliverables. Use when the user mentions "Lean UX", "design hypothesis", "UX experiment", "collaborative design", or "outcome over output". Covers hypothesis statements, MVPs for UX, and cross-functional collaboration. For Build-Measure-Learn, see lean-startup. For usability audits, see ux-heuristics.
development
Design MVPs, validated learning experiments, and pivot-or-persevere decisions using Build-Measure-Learn. Use when the user mentions "MVP scope", "validated learning", "pivot or persevere", "vanity metrics", or "test assumptions". Covers innovation accounting and actionable metrics. For 5-day prototype testing, see design-sprint. For customer motivation analysis, see jobs-to-be-done.
tools
Instrument, trace, evaluate, and monitor LLM applications and AI agents with LangSmith. Use when setting up observability for LLM pipelines, running offline or online evaluations, managing prompts in the Prompt Hub, creating datasets for regression testing, or deploying agent servers. Triggers on: langsmith, langchain tracing, llm tracing, llm observability, llm evaluation, trace llm calls, @traceable, wrap_openai, langsmith evaluate, langsmith dataset, langsmith feedback, langsmith prompt hub, langsmith project, llm monitoring, llm debugging, llm quality, openevals, langsmith cli, langsmith experiment, annotate llm, llm judge.