.agents/skills/SINGULARITY-MBP-DATAWEAVE/SKILL.md
Data pipeline for THEMANBEARPIG: SQLite→graph transforms, DuckDB analytical aggregations, LanceDB vector enrichment, Polars DataFrame ops, FTS5 search integration, incremental updates, Neo4j/GraphML/D3 export. Transforms 183+ DB tables (175K evidence, 167K authorities, 16K timeline, 5K impeachment) into 13-layer graph-ready data with quality gates, dedup, schema verification, and delta merge.
npx skillsauth add fatcrapinmybutt/cortex-osint SINGULARITY-MBP-DATAWEAVEInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
The data fabric that gives THEMANBEARPIG its nervous system. 183 tables → 13 layers → one living graph.
litigation_context.db (1.3 GB, 183+ tables)
│
├─── SQLite WAL ──────────┐
│ │
├─── DuckDB ATTACH ───────┼──→ TransformPipeline
│ (10-100× analytics) │ │
├─── LanceDB ─────────────┤ ├── NodeFactory (typed node generation)
│ (75K vectors, 384d) │ ├── LinkFactory (relationship discovery)
│ │ ├── QualityGate (dedup, schema, nulls)
└─── Polars ──────────────┘ ├── DeltaEngine (incremental updates)
(LazyFrame eval) └── ExportEngine (D3/Neo4j/GraphML/adj)
│
graph_data_v7.json
nodes.csv + edges.csv
graph.graphml
from dataclasses import dataclass, field, asdict
from typing import Optional
from enum import Enum
import hashlib
class NodeType(str, Enum):
"""All 13 layer node types for THEMANBEARPIG."""
PERSON = "PERSON"
EVIDENCE = "EVIDENCE"
AUTHORITY = "AUTHORITY"
EVENT = "EVENT"
FILING = "FILING"
WEAPON = "WEAPON" # Judicial violations, false allegations
CLAIM = "CLAIM"
IMPEACHMENT = "IMPEACHMENT"
CARTEL = "CARTEL" # Berry-McNeill intelligence
POLICE = "POLICE" # Police report nodes
FORM = "FORM" # Court forms
RULE = "RULE" # MCR/MCL rules
CONVERGENCE = "CONVERGENCE" # Convergence domain tracking
class LinkType(str, Enum):
"""Relationship types between nodes."""
SUPPORTS = "SUPPORTS"
CONTRADICTS = "CONTRADICTS"
VIOLATES = "VIOLATES"
FILED_BY = "FILED_BY"
TARGETS = "TARGETS"
CITES = "CITES"
PRECEDES = "PRECEDES"
IMPEACHES = "IMPEACHES"
CONNECTED_TO = "CONNECTED_TO"
ASSIGNED_TO = "ASSIGNED_TO"
PART_OF = "PART_OF"
MENTIONS = "MENTIONS"
CLUSTERS_WITH = "CLUSTERS_WITH"
@dataclass
class GraphNode:
"""A node in the THEMANBEARPIG graph."""
id: str
label: str
node_type: NodeType
layer: int # 0-12 (13 layers)
lane: str = "" # A, B, C, D, E, F, CRIMINAL
severity: float = 0.0 # 0-10 scale
confidence: float = 1.0 # 0-1 scale
date: str = "" # ISO date for temporal ordering
source_table: str = "" # Origin table in litigation_context.db
source_id: Optional[int] = None
metadata: dict = field(default_factory=dict)
@property
def hash_id(self) -> str:
"""Deterministic ID from type + label for dedup."""
raw = f"{self.node_type.value}::{self.label}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]
@dataclass
class GraphLink:
"""A link between two nodes."""
source: str # Source node ID
target: str # Target node ID
link_type: LinkType
weight: float = 1.0 # Link strength for D3 force
lane: str = ""
metadata: dict = field(default_factory=dict)
@property
def id(self) -> str:
return f"{self.source}→{self.link_type.value}→{self.target}"
import sqlite3
from pathlib import Path
from contextlib import contextmanager
DB_PATH = Path(r"C:\Users\andre\LitigationOS\litigation_context.db")
PRAGMAS = """
PRAGMA busy_timeout = 60000;
PRAGMA journal_mode = WAL;
PRAGMA cache_size = -32000;
PRAGMA temp_store = MEMORY;
PRAGMA synchronous = NORMAL;
PRAGMA mmap_size = 268435456;
"""
@contextmanager
def get_connection(db_path: Path = DB_PATH, read_only: bool = True):
"""Safe SQLite connection with mandatory PRAGMAs."""
uri = f"file:///{db_path}?mode=ro" if read_only else f"file:///{db_path}"
conn = sqlite3.connect(uri if read_only else str(db_path), uri=read_only)
conn.row_factory = sqlite3.Row
for pragma in PRAGMAS.strip().split("\n"):
if pragma.strip():
try:
conn.execute(pragma.strip())
except sqlite3.OperationalError:
pass # Some PRAGMAs may fail on read-only
try:
yield conn
finally:
conn.close()
def verify_schema(conn: sqlite3.Connection, table: str) -> set[str]:
"""MANDATORY schema verification before any query (Rule 16)."""
rows = conn.execute(f"PRAGMA table_info({table})").fetchall()
if not rows:
raise ValueError(f"Table '{table}' does not exist or has no columns")
return {row["name"] for row in rows}
class DataWeavePipeline:
"""
Master pipeline: 183+ tables → typed nodes + links → graph export.
Usage:
pipeline = DataWeavePipeline()
graph = pipeline.build_full_graph()
pipeline.export_d3(graph, "graph_data_v7.json")
"""
# Table → (NodeType, layer_index, transform_function_name)
TABLE_REGISTRY = {
"evidence_quotes": (NodeType.EVIDENCE, 0, "_transform_evidence"),
"authority_chains_v2": (NodeType.AUTHORITY, 1, "_transform_authorities"),
"timeline_events": (NodeType.EVENT, 2, "_transform_timeline"),
"impeachment_matrix": (NodeType.IMPEACHMENT, 3, "_transform_impeachment"),
"contradiction_map": (NodeType.EVIDENCE, 4, "_transform_contradictions"),
"judicial_violations": (NodeType.WEAPON, 5, "_transform_violations"),
"berry_mcneill_intelligence":(NodeType.CARTEL, 6, "_transform_cartel"),
"police_reports": (NodeType.POLICE, 7, "_transform_police"),
"filing_readiness": (NodeType.FILING, 8, "_transform_filings"),
"michigan_rules_extracted": (NodeType.RULE, 9, "_transform_rules"),
"claims": (NodeType.CLAIM, 10, "_transform_claims"),
"master_citations": (NodeType.AUTHORITY, 11, "_transform_citations"),
"convergence_domains": (NodeType.CONVERGENCE, 12, "_transform_convergence"),
}
def __init__(self, db_path: Path = DB_PATH):
self.db_path = db_path
self._nodes: dict[str, GraphNode] = {}
self._links: list[GraphLink] = []
self._persons: dict[str, str] = {} # name → node_id for person dedup
def build_full_graph(self) -> dict:
"""Execute all transforms and return complete graph."""
with get_connection(self.db_path) as conn:
# Phase 1: Extract persons (shared across all layers)
self._extract_persons(conn)
# Phase 2: Transform each table
for table, (node_type, layer, method_name) in self.TABLE_REGISTRY.items():
columns = verify_schema(conn, table)
method = getattr(self, method_name, None)
if method:
try:
method(conn, columns, layer)
except Exception as e:
print(f"[WARN] Transform failed for {table}: {e}")
# Phase 3: Cross-table link discovery
self._discover_cross_links(conn)
return {
"nodes": [asdict(n) for n in self._nodes.values()],
"links": [asdict(l) for l in self._links],
"meta": {
"node_count": len(self._nodes),
"link_count": len(self._links),
"layers": 13,
"tables_processed": len(self.TABLE_REGISTRY),
},
}
def _get_or_create_person(self, name: str, layer: int = 0) -> str:
"""Dedup person nodes — one per unique name across all layers."""
normalized = name.strip().title()
if normalized in self._persons:
return self._persons[normalized]
node_id = f"PERSON_{hashlib.sha256(normalized.encode()).hexdigest()[:12]}"
self._nodes[node_id] = GraphNode(
id=node_id,
label=normalized,
node_type=NodeType.PERSON,
layer=layer,
source_table="persons",
)
self._persons[normalized] = node_id
return node_id
def _transform_evidence(self, conn, columns: set, layer: int):
"""
evidence_quotes → EVIDENCE nodes.
Key columns: quote_text, source_file, page_number, category,
lane, bates_number, confidence, is_duplicate,
actor, target_entity
"""
# CRITICAL: filter is_duplicate = 0 (Quality Gate)
has_dup = "is_duplicate" in columns
has_lane = "lane" in columns
has_category = "category" in columns
has_confidence = "confidence" in columns
has_actor = "actor" in columns
has_target = "target_entity" in columns
has_bates = "bates_number" in columns
has_source = "source_file" in columns
has_page = "page_number" in columns
dup_filter = "WHERE is_duplicate = 0" if has_dup else ""
# Sample top evidence per lane for graph (full dataset too large)
# Strategy: top N per lane by confidence, plus all HIGH severity
sql = f"""
SELECT rowid, quote_text,
{'"lane"' if has_lane else "NULL AS lane"},
{'"category"' if has_category else "NULL AS category"},
{'"confidence"' if has_confidence else "1.0 AS confidence"},
{'"actor"' if has_actor else "NULL AS actor"},
{'"target_entity"' if has_target else "NULL AS target_entity"},
{'"bates_number"' if has_bates else "NULL AS bates_number"},
{'"source_file"' if has_source else "NULL AS source_file"},
{'"page_number"' if has_page else "NULL AS page_number"}
FROM evidence_quotes
{dup_filter}
ORDER BY {"confidence DESC," if has_confidence else ""} rowid DESC
LIMIT 5000
"""
rows = conn.execute(sql).fetchall()
for row in rows:
node_id = f"EV_{row['rowid']}"
quote = (row["quote_text"] or "")[:200] # Truncate for label
lane = row["lane"] or ""
category = row["category"] or "general"
self._nodes[node_id] = GraphNode(
id=node_id,
label=quote,
node_type=NodeType.EVIDENCE,
layer=layer,
lane=lane,
confidence=float(row["confidence"] or 1.0),
source_table="evidence_quotes",
source_id=row["rowid"],
metadata={
"category": category,
"bates": row["bates_number"] or "",
"source_file": row["source_file"] or "",
"page": row["page_number"],
},
)
# Create actor → evidence links
if row["actor"]:
actor_id = self._get_or_create_person(row["actor"], layer)
self._links.append(GraphLink(
source=actor_id, target=node_id,
link_type=LinkType.MENTIONS,
lane=lane,
))
# Create evidence → target links
if row["target_entity"]:
target_id = self._get_or_create_person(row["target_entity"], layer)
self._links.append(GraphLink(
source=node_id, target=target_id,
link_type=LinkType.TARGETS,
lane=lane,
))
def _transform_authorities(self, conn, columns: set, layer: int):
"""
authority_chains_v2 → AUTHORITY nodes + CITES links.
Key columns: primary_citation, supporting_citation, relationship,
source_document, source_type, lane, paragraph_context
"""
has_primary = "primary_citation" in columns
has_supporting = "supporting_citation" in columns
has_relationship = "relationship" in columns
has_lane = "lane" in columns
has_source_type = "source_type" in columns
if not has_primary:
return # Schema mismatch — skip gracefully
# Extract unique citations as nodes
sql = """
SELECT primary_citation, COUNT(*) as ref_count,
GROUP_CONCAT(DISTINCT lane) as lanes
FROM authority_chains_v2
WHERE primary_citation IS NOT NULL
AND primary_citation != ''
GROUP BY primary_citation
ORDER BY ref_count DESC
LIMIT 2000
"""
rows = conn.execute(sql).fetchall()
citation_nodes = {}
for row in rows:
cite = row["primary_citation"]
node_id = f"AUTH_{hashlib.sha256(cite.encode()).hexdigest()[:12]}"
citation_nodes[cite] = node_id
# Determine authority sub-type from citation prefix
auth_subtype = _classify_citation(cite)
self._nodes[node_id] = GraphNode(
id=node_id,
label=cite,
node_type=NodeType.AUTHORITY,
layer=layer,
severity=min(row["ref_count"] / 10.0, 10.0), # Normalize to 0-10
source_table="authority_chains_v2",
metadata={
"ref_count": row["ref_count"],
"lanes": row["lanes"] or "",
"auth_type": auth_subtype,
},
)
# Extract citation chains as CITES links
if has_supporting:
chain_sql = """
SELECT primary_citation, supporting_citation, relationship, lane
FROM authority_chains_v2
WHERE primary_citation IN ({})
AND supporting_citation IS NOT NULL
AND supporting_citation != ''
LIMIT 10000
""".format(",".join("?" * len(citation_nodes)))
chain_rows = conn.execute(chain_sql, list(citation_nodes.keys())).fetchall()
for row in chain_rows:
src = row["primary_citation"]
tgt = row["supporting_citation"]
if src in citation_nodes and tgt in citation_nodes:
self._links.append(GraphLink(
source=citation_nodes[src],
target=citation_nodes[tgt],
link_type=LinkType.CITES,
lane=row["lane"] or "",
metadata={"relationship": row["relationship"] or ""},
))
def _classify_citation(cite: str) -> str:
"""Classify citation into authority sub-type."""
cite_upper = cite.upper().strip()
if cite_upper.startswith("MCR"):
return "court_rule"
elif cite_upper.startswith("MCL"):
return "statute"
elif cite_upper.startswith("MRE"):
return "evidence_rule"
elif "USC" in cite_upper or "U.S.C." in cite_upper:
return "federal_statute"
elif "MICH" in cite_upper or "MICH APP" in cite_upper:
return "mi_case_law"
elif "US " in cite_upper or "S.CT." in cite_upper or "S. CT." in cite_upper:
return "scotus"
elif "F.3D" in cite_upper or "F.4TH" in cite_upper or "F. SUPP" in cite_upper:
return "federal_case_law"
elif "CONST" in cite_upper or "AMEND" in cite_upper:
return "constitution"
else:
return "other"
def _transform_timeline(self, conn, columns: set, layer: int):
"""
timeline_events → EVENT nodes + PRECEDES links.
Key columns: event_date, event_description, actor, lane,
category, source_document
"""
has_date = "event_date" in columns
has_desc = "event_description" in columns
has_actor = "actor" in columns
has_lane = "lane" in columns
has_category = "category" in columns
if not has_date:
return
sql = f"""
SELECT rowid, event_date,
{'"event_description"' if has_desc else "NULL AS event_description"},
{'"actor"' if has_actor else "NULL AS actor"},
{'"lane"' if has_lane else "NULL AS lane"},
{'"category"' if has_category else "NULL AS category"}
FROM timeline_events
WHERE event_date IS NOT NULL
AND event_date != ''
ORDER BY event_date ASC
LIMIT 5000
"""
rows = conn.execute(sql).fetchall()
prev_node_id = None
for row in rows:
node_id = f"TL_{row['rowid']}"
desc = (row["event_description"] or "")[:150]
lane = row["lane"] or ""
self._nodes[node_id] = GraphNode(
id=node_id,
label=desc,
node_type=NodeType.EVENT,
layer=layer,
lane=lane,
date=row["event_date"] or "",
source_table="timeline_events",
source_id=row["rowid"],
metadata={"category": row["category"] or ""},
)
# Actor → event links
if row["actor"]:
for actor_name in row["actor"].split(","):
actor_name = actor_name.strip()
if actor_name:
actor_id = self._get_or_create_person(actor_name, layer)
self._links.append(GraphLink(
source=actor_id, target=node_id,
link_type=LinkType.MENTIONS, lane=lane,
))
# Temporal PRECEDES chain (same lane only)
if prev_node_id and lane == (self._nodes.get(prev_node_id, GraphNode(
id="", label="", node_type=NodeType.EVENT, layer=0
)).lane):
self._links.append(GraphLink(
source=prev_node_id, target=node_id,
link_type=LinkType.PRECEDES, lane=lane,
weight=0.3, # Light weight — temporal, not causal
))
prev_node_id = node_id
def _transform_impeachment(self, conn, columns: set, layer: int):
"""
impeachment_matrix → IMPEACHMENT nodes + IMPEACHES links.
Key columns: category, evidence_summary, source_file, quote_text,
impeachment_value (1-10), cross_exam_question,
filing_relevance, event_date, target
"""
has_value = "impeachment_value" in columns
has_target = "target" in columns
has_quote = "quote_text" in columns
has_category = "category" in columns
has_xexam = "cross_exam_question" in columns
value_col = "impeachment_value" if has_value else "5"
target_col = '"target"' if has_target else "NULL"
sql = f"""
SELECT rowid, {'"category"' if has_category else "NULL AS category"},
evidence_summary,
{'"quote_text"' if has_quote else "NULL AS quote_text"},
{value_col} AS severity,
{target_col} AS target,
{'"cross_exam_question"' if has_xexam else "NULL AS cross_exam_question"}
FROM impeachment_matrix
ORDER BY {value_col} DESC
LIMIT 3000
"""
rows = conn.execute(sql).fetchall()
for row in rows:
node_id = f"IMP_{row['rowid']}"
label = (row["evidence_summary"] or row.get("quote_text") or "")[:150]
severity = float(row["severity"] or 5)
self._nodes[node_id] = GraphNode(
id=node_id,
label=label,
node_type=NodeType.IMPEACHMENT,
layer=layer,
severity=severity,
source_table="impeachment_matrix",
source_id=row["rowid"],
metadata={
"category": row["category"] or "",
"cross_exam": row["cross_exam_question"] or "",
},
)
# Target → impeachment link
if row["target"]:
target_id = self._get_or_create_person(row["target"], layer)
self._links.append(GraphLink(
source=node_id, target=target_id,
link_type=LinkType.IMPEACHES,
weight=severity / 10.0,
))
def _transform_contradictions(self, conn, columns: set, layer: int):
"""
contradiction_map → CONTRADICTS links between existing nodes.
Key columns: claim_id, source_a, source_b, contradiction_text,
severity, lane
"""
has_source_a = "source_a" in columns
has_source_b = "source_b" in columns
has_severity = "severity" in columns
has_lane = "lane" in columns
has_text = "contradiction_text" in columns
if not (has_source_a and has_source_b):
return
sql = f"""
SELECT rowid, source_a, source_b,
{'"contradiction_text"' if has_text else "NULL AS contradiction_text"},
{'"severity"' if has_severity else "'medium' AS severity"},
{'"lane"' if has_lane else "NULL AS lane"}
FROM contradiction_map
LIMIT 2500
"""
rows = conn.execute(sql).fetchall()
severity_map = {"critical": 10, "high": 8, "medium": 5, "low": 3}
for row in rows:
# Create contradiction nodes (they are evidence of inconsistency)
node_id = f"CONTRA_{row['rowid']}"
text = (row["contradiction_text"] or "")[:150]
sev = row["severity"] or "medium"
weight = severity_map.get(sev.lower(), 5) / 10.0
self._nodes[node_id] = GraphNode(
id=node_id,
label=text,
node_type=NodeType.EVIDENCE,
layer=layer,
severity=severity_map.get(sev.lower(), 5),
lane=row["lane"] or "",
source_table="contradiction_map",
source_id=row["rowid"],
metadata={"source_a": row["source_a"], "source_b": row["source_b"]},
)
# Person A → contradiction link
person_a_id = self._get_or_create_person(row["source_a"], layer)
self._links.append(GraphLink(
source=person_a_id, target=node_id,
link_type=LinkType.CONTRADICTS, weight=weight,
lane=row["lane"] or "",
))
# Person B → contradiction link
person_b_id = self._get_or_create_person(row["source_b"], layer)
self._links.append(GraphLink(
source=person_b_id, target=node_id,
link_type=LinkType.CONTRADICTS, weight=weight,
lane=row["lane"] or "",
))
def _transform_violations(self, conn, columns: set, layer: int):
"""
judicial_violations → WEAPON nodes + VIOLATES links.
Violation types: ex_parte, benchbook, mcr_2003, procedural,
ppo_weaponization, due_process, evidence_exclusion
"""
has_type = "violation_type" in columns
has_desc = "description" in columns
has_judge = "judge" in columns
has_date = "violation_date" in columns
has_severity = "severity" in columns
type_col = '"violation_type"' if has_type else "'unknown'"
desc_col = '"description"' if has_desc else "NULL"
judge_col = '"judge"' if has_judge else "NULL"
date_col = '"violation_date"' if has_date else "NULL"
sev_col = '"severity"' if has_severity else "5"
sql = f"""
SELECT rowid, {type_col} AS vtype, {desc_col} AS description,
{judge_col} AS judge, {date_col} AS violation_date,
{sev_col} AS severity
FROM judicial_violations
ORDER BY {sev_col} DESC
LIMIT 2000
"""
rows = conn.execute(sql).fetchall()
# Violation type weights (higher = worse)
VIOLATION_WEIGHTS = {
"ex_parte": 9.0,
"benchbook": 6.0,
"mcr_2003": 8.0,
"procedural": 5.0,
"ppo_weaponization": 8.5,
"due_process": 9.5,
"evidence_exclusion": 7.0,
}
for row in rows:
node_id = f"JV_{row['rowid']}"
vtype = row["vtype"] or "unknown"
desc = (row["description"] or vtype)[:150]
severity = float(row["severity"] or VIOLATION_WEIGHTS.get(vtype, 5.0))
self._nodes[node_id] = GraphNode(
id=node_id,
label=desc,
node_type=NodeType.WEAPON,
layer=layer,
severity=severity,
date=row["violation_date"] or "",
source_table="judicial_violations",
source_id=row["rowid"],
metadata={"violation_type": vtype},
)
# Judge → VIOLATES link
if row["judge"]:
judge_id = self._get_or_create_person(row["judge"], layer)
self._links.append(GraphLink(
source=judge_id, target=node_id,
link_type=LinkType.VIOLATES,
weight=severity / 10.0,
))
def _transform_cartel(self, conn, columns: set, layer: int):
"""
berry_mcneill_intelligence → CARTEL nodes + CONNECTED_TO links.
Maps the three-court judicial cartel:
McNeill + Hoopes + Ladas-Hoopes = former partners at Ladas, Hoopes & McNeill.
"""
has_person_a = "person_a" in columns
has_person_b = "person_b" in columns
has_connection = "connection_type" in columns
has_evidence = "evidence_text" in columns
# Adaptive column selection
col_a = '"person_a"' if has_person_a else '"actor"' if "actor" in columns else "NULL"
col_b = '"person_b"' if has_person_b else '"target"' if "target" in columns else "NULL"
col_conn = '"connection_type"' if has_connection else "'cartel'"
col_ev = '"evidence_text"' if has_evidence else "NULL"
sql = f"""
SELECT rowid, {col_a} AS person_a, {col_b} AS person_b,
{col_conn} AS connection_type, {col_ev} AS evidence_text
FROM berry_mcneill_intelligence
LIMIT 500
"""
rows = conn.execute(sql).fetchall()
for row in rows:
if not row["person_a"] or not row["person_b"]:
continue
node_id = f"CARTEL_{row['rowid']}"
evidence = (row["evidence_text"] or row["connection_type"] or "")[:150]
self._nodes[node_id] = GraphNode(
id=node_id,
label=evidence,
node_type=NodeType.CARTEL,
layer=layer,
severity=8.0, # Cartel connections are always high severity
source_table="berry_mcneill_intelligence",
source_id=row["rowid"],
metadata={"connection_type": row["connection_type"] or ""},
)
pa_id = self._get_or_create_person(row["person_a"], layer)
pb_id = self._get_or_create_person(row["person_b"], layer)
self._links.append(GraphLink(
source=pa_id, target=node_id,
link_type=LinkType.CONNECTED_TO, weight=0.9,
))
self._links.append(GraphLink(
source=pb_id, target=node_id,
link_type=LinkType.CONNECTED_TO, weight=0.9,
))
def _transform_police(self, conn, columns: set, layer: int):
"""police_reports → POLICE evidence nodes. NSPD reports — zero arrests."""
has_case = "case_number" in columns
has_date = "report_date" in columns
has_summary = "summary" in columns
has_officer = "officer" in columns
sql = f"""
SELECT rowid,
{'"case_number"' if has_case else "NULL AS case_number"},
{'"report_date"' if has_date else "NULL AS report_date"},
{'"summary"' if has_summary else "NULL AS summary"},
{'"officer"' if has_officer else "NULL AS officer"}
FROM police_reports
LIMIT 500
"""
rows = conn.execute(sql).fetchall()
for row in rows:
node_id = f"PR_{row['rowid']}"
label = row["case_number"] or row["summary"] or f"Police Report #{row['rowid']}"
self._nodes[node_id] = GraphNode(
id=node_id,
label=str(label)[:150],
node_type=NodeType.POLICE,
layer=layer,
date=row["report_date"] or "",
source_table="police_reports",
source_id=row["rowid"],
metadata={
"case_number": row["case_number"] or "",
"officer": row["officer"] or "",
},
)
if row["officer"]:
officer_id = self._get_or_create_person(row["officer"], layer)
self._links.append(GraphLink(
source=officer_id, target=node_id,
link_type=LinkType.FILED_BY,
))
def _transform_filings(self, conn, columns: set, layer: int):
"""filing_readiness → FILING nodes with confidence/status."""
has_name = "vehicle_name" in columns
has_status = "status" in columns
has_confidence = "confidence" in columns
has_lane = "lane" in columns
name_col = '"vehicle_name"' if has_name else '"filing_name"' if "filing_name" in columns else "NULL"
status_col = '"status"' if has_status else "'unknown'"
conf_col = '"confidence"' if has_confidence else "0.5"
sql = f"""
SELECT rowid, {name_col} AS name, {status_col} AS status,
{conf_col} AS confidence,
{'"lane"' if has_lane else "NULL AS lane"}
FROM filing_readiness
LIMIT 100
"""
rows = conn.execute(sql).fetchall()
for row in rows:
if not row["name"]:
continue
node_id = f"FILING_{row['rowid']}"
self._nodes[node_id] = GraphNode(
id=node_id,
label=str(row["name"]),
node_type=NodeType.FILING,
layer=layer,
lane=row["lane"] or "",
confidence=float(row["confidence"] or 0.5),
source_table="filing_readiness",
source_id=row["rowid"],
metadata={"status": row["status"] or "unknown"},
)
def _transform_rules(self, conn, columns: set, layer: int):
"""michigan_rules_extracted → RULE nodes for the authority layer."""
has_citation = "citation" in columns
has_title = "title" in columns
has_text = "rule_text" in columns
if not has_citation:
# Try alternate column names
if "rule_citation" in columns:
cite_col = "rule_citation"
elif "rule_number" in columns:
cite_col = "rule_number"
else:
return
else:
cite_col = "citation"
sql = f"""
SELECT rowid, "{cite_col}" AS citation,
{'"title"' if has_title else "NULL AS title"},
{f'SUBSTR("{("rule_text" if has_text else "text" if "text" in columns else "content")}", 1, 200)' if (has_text or "text" in columns or "content" in columns) else "NULL"} AS excerpt
FROM michigan_rules_extracted
WHERE "{cite_col}" IS NOT NULL AND "{cite_col}" != ''
LIMIT 2000
"""
try:
rows = conn.execute(sql).fetchall()
except sqlite3.OperationalError:
return
for row in rows:
cite = row["citation"]
node_id = f"RULE_{hashlib.sha256(str(cite).encode()).hexdigest()[:12]}"
self._nodes[node_id] = GraphNode(
id=node_id,
label=str(cite),
node_type=NodeType.RULE,
layer=layer,
source_table="michigan_rules_extracted",
source_id=row["rowid"],
metadata={"title": row["title"] or ""},
)
def _transform_claims(self, conn, columns: set, layer: int):
"""claims → CLAIM nodes with status and vehicle mapping."""
has_name = "claim_name" in columns or "claim_id" in columns
if not has_name:
return
name_col = "claim_name" if "claim_name" in columns else "claim_id"
sql = f"""
SELECT rowid, "{name_col}" AS name,
{"status" if "status" in columns else "'active'"} AS status,
{"vehicle_name" if "vehicle_name" in columns else "NULL"} AS vehicle,
{"lane" if "lane" in columns else "NULL"} AS lane
FROM claims LIMIT 500
"""
try:
rows = conn.execute(sql).fetchall()
except sqlite3.OperationalError:
return
for row in rows:
node_id = f"CLAIM_{row['rowid']}"
self._nodes[node_id] = GraphNode(
id=node_id, label=str(row["name"] or ""),
node_type=NodeType.CLAIM, layer=layer,
lane=row["lane"] or "",
source_table="claims", source_id=row["rowid"],
metadata={"status": row["status"], "vehicle": row["vehicle"] or ""},
)
def _transform_citations(self, conn, columns: set, layer: int):
"""master_citations → supplementary AUTHORITY nodes."""
has_cite = "citation" in columns
if not has_cite:
return
sql = """
SELECT rowid, citation, COUNT(*) OVER () AS total
FROM master_citations
WHERE citation IS NOT NULL AND citation != ''
GROUP BY citation
ORDER BY COUNT(*) DESC
LIMIT 1000
"""
try:
rows = conn.execute(sql).fetchall()
except sqlite3.OperationalError:
return
for row in rows:
cite = row["citation"]
node_id = f"MCITE_{hashlib.sha256(str(cite).encode()).hexdigest()[:12]}"
if node_id not in self._nodes:
self._nodes[node_id] = GraphNode(
id=node_id, label=str(cite),
node_type=NodeType.AUTHORITY, layer=layer,
source_table="master_citations", source_id=row["rowid"],
)
def _transform_convergence(self, conn, columns: set, layer: int):
"""convergence_domains → CONVERGENCE status nodes."""
has_name = "domain_name" in columns
has_status = "status" in columns
if not has_name:
return
sql = """
SELECT rowid, domain_name, category_name, status, wave_id,
actual_rows, target_rows
FROM convergence_domains
LIMIT 200
"""
try:
rows = conn.execute(sql).fetchall()
except sqlite3.OperationalError:
return
status_severity = {"RED": 2, "YELLOW": 5, "GREEN": 9}
for row in rows:
node_id = f"CONV_{row['rowid']}"
self._nodes[node_id] = GraphNode(
id=node_id, label=row["domain_name"] or "",
node_type=NodeType.CONVERGENCE, layer=layer,
severity=status_severity.get(row["status"] or "", 2),
source_table="convergence_domains", source_id=row["rowid"],
metadata={
"category": row["category_name"] or "",
"status": row["status"] or "",
"wave": row["wave_id"] or "",
"progress": f"{row['actual_rows'] or 0}/{row['target_rows'] or 0}",
},
)
def _extract_persons(self, conn):
"""Pre-extract known persons from key tables for consistent dedup."""
KNOWN_PERSONS = [
"Andrew Pigors", "Emily Watson", "Jenny McNeill",
"Kenneth Hoopes", "Maria Ladas-Hoopes", "Pamela Rusco",
"Ronald Berry", "Cavan Berry", "Albert Watson",
"Lori Watson", "Jennifer Barnes",
]
for name in KNOWN_PERSONS:
self._get_or_create_person(name, layer=0)
def _discover_cross_links(self, conn):
"""
Find connections between nodes on different layers.
Uses shared entity names, dates, and case references.
"""
# Strategy 1: Same-lane links across layers
lane_buckets: dict[str, list[str]] = {}
for nid, node in self._nodes.items():
if node.lane:
lane_buckets.setdefault(node.lane, []).append(nid)
for lane, node_ids in lane_buckets.items():
# Connect filings to their evidence within same lane
filings = [n for n in node_ids if self._nodes[n].node_type == NodeType.FILING]
evidence = [n for n in node_ids if self._nodes[n].node_type == NodeType.EVIDENCE]
for f_id in filings:
for e_id in evidence[:50]: # Cap per filing
self._links.append(GraphLink(
source=e_id, target=f_id,
link_type=LinkType.SUPPORTS,
weight=0.2, lane=lane,
))
# Strategy 2: Authority → Claim links
for nid, node in self._nodes.items():
if node.node_type == NodeType.CLAIM:
vehicle = node.metadata.get("vehicle", "")
if vehicle:
for aid, anode in self._nodes.items():
if anode.node_type == NodeType.AUTHORITY and vehicle in anode.label:
self._links.append(GraphLink(
source=aid, target=nid,
link_type=LinkType.SUPPORTS, weight=0.6,
))
import duckdb
def get_duckdb_conn(db_path: Path = DB_PATH) -> duckdb.DuckDBPyConnection:
"""
Create DuckDB connection with SQLite scanner ATTACH.
10-100× faster for analytical queries vs SQLite.
"""
conn = duckdb.connect(":memory:")
conn.execute("INSTALL sqlite; LOAD sqlite;")
conn.execute(f"ATTACH '{db_path}' AS lit (TYPE SQLITE, READ_ONLY);")
return conn
def compute_evidence_aggregates(duck: duckdb.DuckDBPyConnection) -> dict:
"""
Aggregate evidence counts per person, per lane, per category.
Used for node sizing in D3 — bigger nodes = more evidence.
"""
# Evidence per actor (for PERSON node radius)
actor_counts = duck.execute("""
SELECT actor, lane, category,
COUNT(*) AS evidence_count,
AVG(CAST(confidence AS DOUBLE)) AS avg_confidence
FROM lit.evidence_quotes
WHERE actor IS NOT NULL
AND (is_duplicate = 0 OR is_duplicate IS NULL)
GROUP BY actor, lane, category
ORDER BY evidence_count DESC
LIMIT 5000
""").fetchdf()
# Evidence per lane (for lane-level aggregation)
lane_counts = duck.execute("""
SELECT lane,
COUNT(*) AS total,
COUNT(DISTINCT actor) AS unique_actors,
COUNT(DISTINCT category) AS unique_categories
FROM lit.evidence_quotes
WHERE is_duplicate = 0 OR is_duplicate IS NULL
GROUP BY lane
ORDER BY total DESC
""").fetchdf()
# Impeachment severity distribution per target
impeachment_dist = duck.execute("""
SELECT target,
COUNT(*) AS total_items,
AVG(CAST(impeachment_value AS DOUBLE)) AS avg_severity,
MAX(CAST(impeachment_value AS DOUBLE)) AS max_severity,
SUM(CASE WHEN CAST(impeachment_value AS INT) >= 8 THEN 1 ELSE 0 END) AS critical_count
FROM lit.impeachment_matrix
WHERE target IS NOT NULL
GROUP BY target
ORDER BY avg_severity DESC
LIMIT 100
""").fetchdf()
return {
"actor_counts": actor_counts.to_dict("records"),
"lane_counts": lane_counts.to_dict("records"),
"impeachment_dist": impeachment_dist.to_dict("records"),
}
def discover_relationships_duckdb(duck: duckdb.DuckDBPyConnection) -> list[dict]:
"""
Cross-table JOINs to find hidden relationships.
DuckDB handles these 10-100× faster than SQLite.
"""
# Find persons who appear in BOTH impeachment AND evidence
cross_refs = duck.execute("""
WITH impeachment_targets AS (
SELECT DISTINCT target AS person FROM lit.impeachment_matrix
WHERE target IS NOT NULL
),
evidence_actors AS (
SELECT DISTINCT actor AS person FROM lit.evidence_quotes
WHERE actor IS NOT NULL
),
violation_judges AS (
SELECT DISTINCT judge AS person FROM lit.judicial_violations
WHERE judge IS NOT NULL
)
SELECT
COALESCE(i.person, e.person, v.person) AS person,
CASE WHEN i.person IS NOT NULL THEN 1 ELSE 0 END AS in_impeachment,
CASE WHEN e.person IS NOT NULL THEN 1 ELSE 0 END AS in_evidence,
CASE WHEN v.person IS NOT NULL THEN 1 ELSE 0 END AS in_violations
FROM impeachment_targets i
FULL OUTER JOIN evidence_actors e ON LOWER(i.person) = LOWER(e.person)
FULL OUTER JOIN violation_judges v ON LOWER(COALESCE(i.person, e.person)) = LOWER(v.person)
WHERE COALESCE(i.person, e.person, v.person) IS NOT NULL
ORDER BY (CASE WHEN i.person IS NOT NULL THEN 1 ELSE 0 END
+ CASE WHEN e.person IS NOT NULL THEN 1 ELSE 0 END
+ CASE WHEN v.person IS NOT NULL THEN 1 ELSE 0 END) DESC
""").fetchdf()
return cross_refs.to_dict("records")
def compute_temporal_patterns(duck: duckdb.DuckDBPyConnection) -> list[dict]:
"""
Detect escalation patterns using window functions.
Clusters events by time proximity and calculates inter-event gaps.
"""
patterns = duck.execute("""
WITH dated_events AS (
SELECT
rowid,
event_date,
actor,
lane,
event_description,
TRY_CAST(event_date AS DATE) AS parsed_date
FROM lit.timeline_events
WHERE event_date IS NOT NULL
AND TRY_CAST(event_date AS DATE) IS NOT NULL
),
windowed AS (
SELECT *,
LAG(parsed_date) OVER (PARTITION BY lane ORDER BY parsed_date) AS prev_date,
LEAD(parsed_date) OVER (PARTITION BY lane ORDER BY parsed_date) AS next_date,
ROW_NUMBER() OVER (PARTITION BY lane ORDER BY parsed_date) AS event_seq,
COUNT(*) OVER (PARTITION BY lane) AS lane_total
FROM dated_events
)
SELECT
lane,
parsed_date,
actor,
SUBSTR(event_description, 1, 100) AS description,
event_seq,
lane_total,
DATEDIFF('day', prev_date, parsed_date) AS days_since_prev,
CASE
WHEN DATEDIFF('day', prev_date, parsed_date) <= 1 THEN 'BURST'
WHEN DATEDIFF('day', prev_date, parsed_date) <= 7 THEN 'CLUSTER'
WHEN DATEDIFF('day', prev_date, parsed_date) <= 30 THEN 'NORMAL'
ELSE 'GAP'
END AS tempo_class
FROM windowed
WHERE days_since_prev IS NOT NULL
ORDER BY lane, parsed_date
LIMIT 10000
""").fetchdf()
return patterns.to_dict("records")
def build_convergence_pivot(duck: duckdb.DuckDBPyConnection) -> dict:
"""
Pivot convergence_domains by category × status for dashboard display.
"""
pivot = duck.execute("""
SELECT
category_name,
COUNT(*) FILTER (WHERE status = 'RED') AS red_count,
COUNT(*) FILTER (WHERE status = 'YELLOW') AS yellow_count,
COUNT(*) FILTER (WHERE status = 'GREEN') AS green_count,
COUNT(*) AS total,
ROUND(COUNT(*) FILTER (WHERE status = 'GREEN') * 100.0 / COUNT(*), 1) AS pct_complete
FROM lit.convergence_domains
GROUP BY category_name
ORDER BY pct_complete ASC
""").fetchdf()
# Lane × table coverage matrix
lane_coverage = duck.execute("""
SELECT lane,
COUNT(DISTINCT category) AS categories_covered,
COUNT(*) AS total_evidence,
COUNT(DISTINCT source_file) AS unique_sources
FROM lit.evidence_quotes
WHERE lane IS NOT NULL AND lane != ''
AND (is_duplicate = 0 OR is_duplicate IS NULL)
GROUP BY lane
ORDER BY total_evidence DESC
""").fetchdf()
return {
"convergence_pivot": pivot.to_dict("records"),
"lane_coverage": lane_coverage.to_dict("records"),
}
import lancedb
import numpy as np
LANCEDB_PATH = Path(r"C:\Users\andre\LitigationOS\00_SYSTEM\engines\semantic\lancedb_store")
def get_lancedb():
"""Connect to the LanceDB vector store (75K 384-dim vectors)."""
db = lancedb.connect(str(LANCEDB_PATH))
tables = db.table_names()
if not tables:
raise RuntimeError("No LanceDB tables found — semantic engine may not be initialized")
return db, db.open_table(tables[0])
def compute_semantic_clusters(table, query_texts: list[str], top_k: int = 20) -> list[dict]:
"""
Find semantically similar evidence for clustering on the graph.
Returns groups of related nodes that should be positioned close together.
"""
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
clusters = []
for query in query_texts:
embedding = model.encode(query).tolist()
results = table.search(embedding).limit(top_k).to_pandas()
cluster = {
"query": query,
"members": [],
}
for _, row in results.iterrows():
cluster["members"].append({
"text": row.get("text", "")[:200],
"distance": float(row.get("_distance", 0)),
"source": row.get("source", ""),
})
clusters.append(cluster)
return clusters
def generate_position_hints(table, nodes: list[GraphNode]) -> dict[str, tuple[float, float]]:
"""
Use vector embeddings to generate (x, y) position hints for D3.
Semantically similar nodes get close initial positions via t-SNE.
"""
from sklearn.manifold import TSNE
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
# Encode node labels
labels = [n.label[:200] for n in nodes if n.label]
if len(labels) < 5:
return {}
embeddings = model.encode(labels, batch_size=64, show_progress_bar=False)
# t-SNE reduction to 2D
perplexity = min(30, len(labels) - 1)
tsne = TSNE(n_components=2, perplexity=perplexity, random_state=42, n_iter=500)
coords = tsne.fit_transform(embeddings)
# Normalize to [0, 1000] range for D3 viewBox
x_min, x_max = coords[:, 0].min(), coords[:, 0].max()
y_min, y_max = coords[:, 1].min(), coords[:, 1].max()
x_range = x_max - x_min or 1
y_range = y_max - y_min or 1
hints = {}
for i, node in enumerate(nodes):
if i < len(coords) and node.label:
hints[node.id] = (
float((coords[i, 0] - x_min) / x_range * 1000),
float((coords[i, 1] - y_min) / y_range * 1000),
)
return hints
def find_near_duplicates(table, threshold: float = 0.15) -> list[tuple[str, str, float]]:
"""
Find near-duplicate evidence by vector similarity.
Pairs with distance < threshold are likely duplicates.
Used to draw CLUSTERS_WITH links on the dedup layer.
"""
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
# Sample evidence texts
df = table.to_pandas()
if len(df) == 0:
return []
sample = df.head(2000)
texts = sample.get("text", sample.iloc[:, 0]).tolist()
embeddings = model.encode(texts, batch_size=64, show_progress_bar=False)
# Cosine similarity matrix (batch computation)
from sklearn.metrics.pairwise import cosine_similarity
sim_matrix = cosine_similarity(embeddings)
duplicates = []
for i in range(len(sim_matrix)):
for j in range(i + 1, len(sim_matrix)):
dist = 1.0 - sim_matrix[i][j]
if dist < threshold:
duplicates.append((
texts[i][:100],
texts[j][:100],
float(sim_matrix[i][j]),
))
return sorted(duplicates, key=lambda x: -x[2])[:500]
import polars as pl
def build_node_dataframe(nodes: dict[str, GraphNode]) -> pl.LazyFrame:
"""
Convert nodes dict to Polars LazyFrame for efficient manipulation.
LazyFrame defers execution until .collect() — zero memory waste.
"""
records = []
for nid, node in nodes.items():
records.append({
"id": node.id,
"label": node.label,
"node_type": node.node_type.value,
"layer": node.layer,
"lane": node.lane,
"severity": node.severity,
"confidence": node.confidence,
"date": node.date,
"source_table": node.source_table,
})
return pl.LazyFrame(records)
def aggregate_nodes_by_type(lf: pl.LazyFrame) -> pl.DataFrame:
"""Per-type statistics for the HUD gauge display."""
return (
lf
.group_by("node_type")
.agg([
pl.count().alias("count"),
pl.col("severity").mean().alias("avg_severity"),
pl.col("confidence").mean().alias("avg_confidence"),
pl.col("lane").n_unique().alias("unique_lanes"),
pl.col("layer").n_unique().alias("unique_layers"),
])
.sort("count", descending=True)
.collect()
)
def compute_lane_statistics(lf: pl.LazyFrame) -> pl.DataFrame:
"""Per-lane breakdown for the filing readiness gauge."""
return (
lf
.filter(pl.col("lane") != "")
.group_by("lane")
.agg([
pl.count().alias("total_nodes"),
pl.col("node_type").n_unique().alias("type_diversity"),
pl.col("severity").max().alias("max_severity"),
pl.col("severity").mean().alias("avg_severity"),
])
.sort("total_nodes", descending=True)
.collect()
)
def group_events_by_period(lf: pl.LazyFrame, period: str = "1mo") -> pl.DataFrame:
"""
Group EVENT nodes by time period for the timeline scrubber.
period: "1d", "1w", "1mo", "3mo", "1y"
"""
return (
lf
.filter(
(pl.col("node_type") == "EVENT") &
(pl.col("date") != "") &
(pl.col("date").is_not_null())
)
.with_columns(
pl.col("date").str.to_date("%Y-%m-%d", strict=False).alias("parsed_date")
)
.filter(pl.col("parsed_date").is_not_null())
.group_by_dynamic("parsed_date", every=period)
.agg([
pl.count().alias("event_count"),
pl.col("lane").n_unique().alias("active_lanes"),
pl.col("severity").max().alias("peak_severity"),
pl.col("id").first().alias("first_event_id"),
])
.sort("parsed_date")
.collect()
)
def normalize_link_weights(links: list[GraphLink]) -> list[GraphLink]:
"""
Normalize link weights to [0.1, 1.0] range using Polars.
Prevents visual clutter from extreme weight values in D3.
"""
if not links:
return links
weights = [l.weight for l in links]
df = pl.DataFrame({"weight": weights})
stats = df.select([
pl.col("weight").min().alias("min_w"),
pl.col("weight").max().alias("max_w"),
]).row(0)
min_w, max_w = stats
w_range = max_w - min_w if max_w != min_w else 1.0
for link in links:
link.weight = 0.1 + 0.9 * (link.weight - min_w) / w_range
return links
import re
def sanitize_fts5(query: str) -> str:
"""Mandatory FTS5 sanitization per Rule 15."""
return re.sub(r'[^\w\s*"]', ' ', query).strip()
def search_to_subgraph(
conn: sqlite3.Connection,
query: str,
tables: list[str] = None,
limit: int = 100,
) -> tuple[list[str], list[dict]]:
"""
Search FTS5 indexes → return matching node IDs + highlighted snippets.
Used to highlight search results on the live graph.
"""
if tables is None:
tables = ["evidence_fts", "timeline_fts", "md_sections_fts"]
safe_query = sanitize_fts5(query)
if not safe_query:
return [], []
matched_ids = []
snippets = []
for fts_table in tables:
# Map FTS table to content table and node prefix
content_map = {
"evidence_fts": ("evidence_quotes", "EV_"),
"timeline_fts": ("timeline_events", "TL_"),
"md_sections_fts": ("md_sections", "MD_"),
}
content_table, prefix = content_map.get(fts_table, (fts_table, "UNK_"))
try:
rows = conn.execute(f"""
SELECT rowid, snippet({fts_table}, 0, '<mark>', '</mark>', '...', 40) AS snip,
rank
FROM {fts_table}
WHERE {fts_table} MATCH ?
ORDER BY rank
LIMIT ?
""", (safe_query, limit)).fetchall()
for row in rows:
node_id = f"{prefix}{row['rowid']}"
matched_ids.append(node_id)
snippets.append({
"node_id": node_id,
"snippet": row["snip"],
"rank": row["rank"],
"source": fts_table,
})
except sqlite3.OperationalError:
# FTS5 error → LIKE fallback (Rule 15)
try:
like_query = f"%{safe_query}%"
# Determine text column adaptively
cols = verify_schema(conn, content_table)
text_col = next(
(c for c in ["quote_text", "event_description", "content", "text"]
if c in cols), None
)
if not text_col:
continue
rows = conn.execute(f"""
SELECT rowid, SUBSTR("{text_col}", 1, 200) AS snip
FROM {content_table}
WHERE "{text_col}" LIKE ?
LIMIT ?
""", (like_query, limit)).fetchall()
for row in rows:
node_id = f"{prefix}{row['rowid']}"
matched_ids.append(node_id)
snippets.append({
"node_id": node_id,
"snippet": row["snip"],
"rank": 0,
"source": f"{content_table} (LIKE fallback)",
})
except sqlite3.OperationalError:
continue
return matched_ids, snippets
def extract_subgraph(
graph: dict,
focus_node_ids: list[str],
depth: int = 2,
) -> dict:
"""
Extract a subgraph centered on focus nodes, up to N hops.
Used when the user searches — shows the neighborhood of matching nodes.
"""
# Build adjacency index
adj: dict[str, set[str]] = {}
for link in graph["links"]:
src = link["source"]
tgt = link["target"]
adj.setdefault(src, set()).add(tgt)
adj.setdefault(tgt, set()).add(src)
# BFS expansion from focus nodes
visited = set(focus_node_ids)
frontier = set(focus_node_ids)
for _ in range(depth):
next_frontier = set()
for node_id in frontier:
for neighbor in adj.get(node_id, set()):
if neighbor not in visited:
visited.add(neighbor)
next_frontier.add(neighbor)
frontier = next_frontier
# Filter nodes and links
node_index = {n["id"]: n for n in graph["nodes"]}
sub_nodes = [node_index[nid] for nid in visited if nid in node_index]
sub_links = [
l for l in graph["links"]
if l["source"] in visited and l["target"] in visited
]
return {
"nodes": sub_nodes,
"links": sub_links,
"meta": {
"focus_count": len(focus_node_ids),
"expanded_count": len(visited),
"depth": depth,
},
}
class QualityGate:
"""
Validates data BEFORE graph construction.
Every violation is logged but non-fatal — degrade gracefully.
"""
def __init__(self):
self.violations: list[dict] = []
def check_dedup(self, conn, table: str = "evidence_quotes") -> int:
"""Verify dedup filter — count duplicates that would slip through."""
cols = verify_schema(conn, table)
if "is_duplicate" not in cols:
self.violations.append({
"table": table, "check": "dedup",
"message": "No is_duplicate column — cannot filter duplicates",
"severity": "WARN",
})
return 0
dup_count = conn.execute(
"SELECT COUNT(*) FROM evidence_quotes WHERE is_duplicate = 1"
).fetchone()[0]
return dup_count
def check_schema(self, conn, table: str, required_cols: list[str]) -> bool:
"""Verify required columns exist (Rule 16)."""
actual = verify_schema(conn, table)
missing = set(required_cols) - actual
if missing:
self.violations.append({
"table": table, "check": "schema",
"message": f"Missing columns: {missing}",
"severity": "ERROR",
})
return False
return True
def check_nulls(self, conn, table: str, col: str) -> float:
"""Measure NULL rate for a column — flag if > 50%."""
total = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
nulls = conn.execute(
f'SELECT COUNT(*) FROM {table} WHERE "{col}" IS NULL OR "{col}" = ""'
).fetchone()[0]
rate = nulls / total if total > 0 else 0
if rate > 0.5:
self.violations.append({
"table": table, "check": "nulls",
"message": f"{col}: {rate:.1%} NULL rate",
"severity": "WARN",
})
return rate
def check_lane_validity(self, conn) -> list[str]:
"""Verify all lane values are valid."""
VALID_LANES = {"A", "B", "C", "D", "E", "F", "CRIMINAL", "", None}
cols = verify_schema(conn, "evidence_quotes")
if "lane" not in cols:
return []
invalid = conn.execute("""
SELECT DISTINCT lane FROM evidence_quotes
WHERE lane IS NOT NULL AND lane NOT IN ('A','B','C','D','E','F','CRIMINAL','')
LIMIT 50
""").fetchall()
bad_lanes = [r[0] for r in invalid]
if bad_lanes:
self.violations.append({
"table": "evidence_quotes", "check": "lane_validity",
"message": f"Invalid lanes: {bad_lanes[:10]}",
"severity": "WARN",
})
return bad_lanes
def report(self) -> dict:
"""Generate quality report."""
errors = [v for v in self.violations if v["severity"] == "ERROR"]
warns = [v for v in self.violations if v["severity"] == "WARN"]
return {
"pass": len(errors) == 0,
"errors": len(errors),
"warnings": len(warns),
"violations": self.violations,
}
def validate_graph(graph: dict) -> dict:
"""
Validate the built graph before export.
Catches orphan nodes, dangling links, and degenerate structures.
"""
node_ids = {n["id"] for n in graph["nodes"]}
issues = []
# Check for dangling links (referencing non-existent nodes)
dangling = 0
for link in graph["links"]:
if link["source"] not in node_ids:
dangling += 1
if link["target"] not in node_ids:
dangling += 1
if dangling:
issues.append(f"{dangling} dangling link endpoints")
# Check for orphan nodes (no links)
linked_nodes = set()
for link in graph["links"]:
linked_nodes.add(link["source"])
linked_nodes.add(link["target"])
orphans = node_ids - linked_nodes
if len(orphans) > len(node_ids) * 0.3:
issues.append(f"{len(orphans)} orphan nodes ({len(orphans)/len(node_ids):.0%})")
# Check for self-loops
self_loops = sum(1 for l in graph["links"] if l["source"] == l["target"])
if self_loops:
issues.append(f"{self_loops} self-loops")
# Check node type distribution (at least 3 types)
types = {n["node_type"] for n in graph["nodes"]}
if len(types) < 3:
issues.append(f"Only {len(types)} node types — expected 3+")
return {
"valid": len(issues) == 0,
"node_count": len(node_ids),
"link_count": len(graph["links"]),
"orphan_count": len(orphans),
"self_loops": self_loops,
"dangling_links": dangling,
"node_types": len(types),
"issues": issues,
}
class DeltaEngine:
"""
Detect new/changed rows since last graph build.
Uses rowid tracking — no need for timestamps in every table.
"""
STATE_FILE = Path(r"C:\Users\andre\LitigationOS\00_SYSTEM\engines\mbp\delta_state.json")
def __init__(self):
self._state = self._load_state()
def _load_state(self) -> dict:
if self.STATE_FILE.exists():
import orjson
return orjson.loads(self.STATE_FILE.read_bytes())
return {}
def _save_state(self):
import orjson
self.STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
self.STATE_FILE.write_bytes(orjson.dumps(self._state, option=orjson.OPT_INDENT_2))
def get_new_rows(self, conn, table: str) -> tuple[int, int]:
"""
Returns (last_seen_rowid, max_rowid).
Query for rows WHERE rowid > last_seen_rowid to get deltas.
"""
last_seen = self._state.get(table, 0)
max_rowid = conn.execute(f"SELECT MAX(rowid) FROM {table}").fetchone()[0] or 0
return last_seen, max_rowid
def mark_processed(self, table: str, max_rowid: int):
"""Mark table as processed up to max_rowid."""
self._state[table] = max_rowid
self._save_state()
def needs_update(self, conn, table: str) -> bool:
"""Check if table has new rows since last build."""
last_seen, max_rowid = self.get_new_rows(conn, table)
return max_rowid > last_seen
def get_delta_query(self, table: str, columns: str = "*") -> str:
"""Generate SQL for delta rows only."""
last_seen = self._state.get(table, 0)
return f"SELECT {columns} FROM {table} WHERE rowid > {last_seen}"
def merge_incremental(
existing_graph: dict,
new_nodes: list[dict],
new_links: list[dict],
) -> dict:
"""
Merge new nodes/links into existing graph without full rebuild.
Conflict resolution:
- Same node ID → update metadata, keep higher severity
- Same link source+target+type → update weight, keep latest
- New nodes/links → append directly
"""
# Index existing
node_index = {n["id"]: n for n in existing_graph["nodes"]}
link_index = {}
for link in existing_graph["links"]:
key = f"{link['source']}→{link['link_type']}→{link['target']}"
link_index[key] = link
# Merge nodes
for node in new_nodes:
nid = node["id"]
if nid in node_index:
# Conflict: keep higher severity, merge metadata
existing = node_index[nid]
existing["severity"] = max(existing.get("severity", 0), node.get("severity", 0))
existing["confidence"] = node.get("confidence", existing.get("confidence", 1.0))
if node.get("metadata"):
existing.setdefault("metadata", {}).update(node["metadata"])
else:
node_index[nid] = node
# Merge links
for link in new_links:
key = f"{link['source']}→{link['link_type']}→{link['target']}"
if key in link_index:
# Conflict: update weight
link_index[key]["weight"] = link.get("weight", link_index[key]["weight"])
else:
link_index[key] = link
return {
"nodes": list(node_index.values()),
"links": list(link_index.values()),
"meta": {
"node_count": len(node_index),
"link_count": len(link_index),
"merged_nodes": len(new_nodes),
"merged_links": len(new_links),
},
}
import orjson
def export_d3(graph: dict, output_path: str = "graph_data_v7.json"):
"""
Export to D3.js force-directed graph format.
This is THE primary export consumed by THEMANBEARPIG HTML.
"""
# D3 requires source/target as node IDs (strings)
# Add visual properties for each node type
NODE_COLORS = {
"PERSON": "#ff6b6b",
"EVIDENCE": "#4ecdc4",
"AUTHORITY": "#45b7d1",
"EVENT": "#96ceb4",
"FILING": "#ffeaa7",
"WEAPON": "#ff4757",
"CLAIM": "#dfe6e9",
"IMPEACHMENT": "#e17055",
"CARTEL": "#d63031",
"POLICE": "#0984e3",
"FORM": "#6c5ce7",
"RULE": "#00b894",
"CONVERGENCE": "#fdcb6e",
}
NODE_RADII = {
"PERSON": 12,
"EVIDENCE": 4,
"AUTHORITY": 6,
"EVENT": 5,
"FILING": 10,
"WEAPON": 8,
"CLAIM": 7,
"IMPEACHMENT": 7,
"CARTEL": 9,
"POLICE": 6,
"FORM": 5,
"RULE": 5,
"CONVERGENCE": 6,
}
d3_graph = {
"nodes": [],
"links": [],
"meta": graph.get("meta", {}),
"layer_meta": {
0: {"name": "Evidence", "color": "#4ecdc4"},
1: {"name": "Authority", "color": "#45b7d1"},
2: {"name": "Timeline", "color": "#96ceb4"},
3: {"name": "Impeachment", "color": "#e17055"},
4: {"name": "Contradictions", "color": "#fd79a8"},
5: {"name": "Judicial Violations", "color": "#ff4757"},
6: {"name": "Cartel Intelligence", "color": "#d63031"},
7: {"name": "Police Reports", "color": "#0984e3"},
8: {"name": "Filing Pipeline", "color": "#ffeaa7"},
9: {"name": "Court Rules", "color": "#00b894"},
10: {"name": "Claims", "color": "#dfe6e9"},
11: {"name": "Citations", "color": "#74b9ff"},
12: {"name": "Convergence", "color": "#fdcb6e"},
},
}
for node in graph["nodes"]:
ntype = node.get("node_type", "EVIDENCE")
d3_node = {
"id": node["id"],
"label": node.get("label", "")[:80], # Truncate for rendering
"fullLabel": node.get("label", ""),
"type": ntype,
"layer": node.get("layer", 0),
"lane": node.get("lane", ""),
"severity": node.get("severity", 0),
"confidence": node.get("confidence", 1.0),
"date": node.get("date", ""),
"color": NODE_COLORS.get(ntype, "#95a5a6"),
"radius": NODE_RADII.get(ntype, 5),
"sourceTable": node.get("source_table", ""),
"metadata": node.get("metadata", {}),
}
# Scale radius by severity for WEAPON/IMPEACHMENT nodes
if ntype in ("WEAPON", "IMPEACHMENT") and node.get("severity", 0) > 7:
d3_node["radius"] *= 1.5
d3_graph["nodes"].append(d3_node)
for link in graph["links"]:
d3_graph["links"].append({
"source": link["source"],
"target": link["target"],
"type": link.get("link_type", "CONNECTED_TO"),
"weight": link.get("weight", 1.0),
"lane": link.get("lane", ""),
})
# Write with orjson for speed (10× faster than json.dumps)
output = Path(output_path)
output.write_bytes(orjson.dumps(d3_graph, option=orjson.OPT_INDENT_2))
print(f"[EXPORT] D3 graph: {output} ({len(d3_graph['nodes'])} nodes, {len(d3_graph['links'])} links)")
return output
import csv
def export_neo4j(graph: dict, output_dir: str = "neo4j_export"):
"""
Export for Neo4j Bloom visualization.
Format: Neo4j admin import CSV format.
"""
out = Path(output_dir)
out.mkdir(parents=True, exist_ok=True)
# nodes.csv — Neo4j import format
nodes_path = out / "nodes.csv"
with open(nodes_path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow([
":ID", "label", ":LABEL", "layer:int", "lane",
"severity:float", "confidence:float", "date", "source_table",
])
for node in graph["nodes"]:
writer.writerow([
node["id"],
(node.get("label") or "")[:200],
node.get("node_type", "EVIDENCE"),
node.get("layer", 0),
node.get("lane", ""),
node.get("severity", 0),
node.get("confidence", 1.0),
node.get("date", ""),
node.get("source_table", ""),
])
# edges.csv — Neo4j import format
edges_path = out / "edges.csv"
with open(edges_path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow([":START_ID", ":END_ID", ":TYPE", "weight:float", "lane"])
for link in graph["links"]:
writer.writerow([
link["source"],
link["target"],
link.get("link_type", "CONNECTED_TO"),
link.get("weight", 1.0),
link.get("lane", ""),
])
print(f"[EXPORT] Neo4j CSV: {nodes_path}, {edges_path}")
return nodes_path, edges_path
def export_graphml(graph: dict, output_path: str = "litigation_graph.graphml"):
"""Export to GraphML for interchange with Gephi, yEd, etc."""
import xml.etree.ElementTree as ET
graphml = ET.Element("graphml", xmlns="http://graphml.graphstruct.org/graphml")
# Define attribute keys
keys = [
("node_type", "node", "string"),
("label", "node", "string"),
("layer", "node", "int"),
("lane", "node", "string"),
("severity", "node", "double"),
("link_type", "edge", "string"),
("weight", "edge", "double"),
]
for attr_name, domain, attr_type in keys:
ET.SubElement(graphml, "key", {
"id": attr_name, "for": domain,
"attr.name": attr_name, "attr.type": attr_type,
})
g = ET.SubElement(graphml, "graph", id="G", edgedefault="directed")
# Nodes
for node in graph["nodes"]:
n = ET.SubElement(g, "node", id=node["id"])
for attr in ["node_type", "label", "layer", "lane", "severity"]:
d = ET.SubElement(n, "data", key=attr)
d.text = str(node.get(attr, ""))
# Edges
for i, link in enumerate(graph["links"]):
e = ET.SubElement(g, "edge", {
"id": f"e{i}",
"source": link["source"],
"target": link["target"],
})
ET.SubElement(e, "data", key="link_type").text = link.get("link_type", "")
ET.SubElement(e, "data", key="weight").text = str(link.get("weight", 1.0))
tree = ET.ElementTree(graphml)
ET.indent(tree, space=" ")
tree.write(output_path, encoding="unicode", xml_declaration=True)
print(f"[EXPORT] GraphML: {output_path}")
return Path(output_path)
def export_adjacency_list(graph: dict, output_path: str = "graph_adj.json") -> Path:
"""
Export as adjacency list for NetworkX graph analysis.
Format: {node_id: [neighbor_id, ...]}
"""
adj: dict[str, list[str]] = {}
for link in graph["links"]:
adj.setdefault(link["source"], []).append(link["target"])
adj.setdefault(link["target"], []).append(link["source"])
out = Path(output_path)
out.write_bytes(orjson.dumps(adj, option=orjson.OPT_INDENT_2))
print(f"[EXPORT] Adjacency list: {out} ({len(adj)} nodes)")
return out
def build_themanbearpig(
db_path: Path = DB_PATH,
output_dir: Path = Path(r"C:\Users\andre\LitigationOS\build"),
incremental: bool = False,
include_vectors: bool = False,
) -> dict:
"""
Master build function — orchestrates the complete DATAWEAVE pipeline.
Args:
db_path: Path to litigation_context.db
output_dir: Where to write exports
incremental: If True, only process new rows since last build
include_vectors: If True, compute LanceDB vector enrichment (slower)
Returns:
Build report with counts and timing
"""
import time
start = time.perf_counter()
output_dir.mkdir(parents=True, exist_ok=True)
# Phase 1: Quality gates
print("[1/7] Running quality gates...")
qg = QualityGate()
with get_connection(db_path) as conn:
qg.check_dedup(conn)
qg.check_lane_validity(conn)
for table in ["evidence_quotes", "timeline_events", "impeachment_matrix"]:
try:
verify_schema(conn, table)
except ValueError:
pass
quality_report = qg.report()
if not quality_report["pass"]:
print(f"[WARN] Quality gate issues: {quality_report['errors']} errors, {quality_report['warnings']} warnings")
# Phase 2: SQLite transforms
print("[2/7] Transforming SQLite tables → graph nodes/links...")
pipeline = DataWeavePipeline(db_path)
if incremental:
delta = DeltaEngine()
# Only transform tables with new rows
with get_connection(db_path) as conn:
for table in pipeline.TABLE_REGISTRY:
if delta.needs_update(conn, table):
print(f" → Delta detected in {table}")
graph = pipeline.build_full_graph()
# Phase 3: DuckDB analytics
print("[3/7] Running DuckDB analytical transforms...")
try:
duck = get_duckdb_conn(db_path)
aggregates = compute_evidence_aggregates(duck)
temporal = compute_temporal_patterns(duck)
convergence = build_convergence_pivot(duck)
duck.close()
# Inject aggregates into node metadata
actor_map = {a["actor"]: a for a in aggregates.get("actor_counts", [])}
for node in graph["nodes"]:
if node["node_type"] == "PERSON" and node["label"] in actor_map:
node["metadata"]["evidence_count"] = actor_map[node["label"]]["evidence_count"]
graph["meta"]["analytics"] = {
"lane_counts": aggregates.get("lane_counts", []),
"convergence": convergence,
}
except Exception as e:
print(f"[WARN] DuckDB analytics failed (non-fatal): {e}")
# Phase 4: Vector enrichment (optional)
if include_vectors:
print("[4/7] Computing LanceDB vector enrichment...")
try:
_, lance_table = get_lancedb()
hints = generate_position_hints(lance_table, list(pipeline._nodes.values())[:1000])
for nid, (x, y) in hints.items():
for node in graph["nodes"]:
if node["id"] == nid:
node["metadata"]["hint_x"] = x
node["metadata"]["hint_y"] = y
break
except Exception as e:
print(f"[WARN] Vector enrichment skipped: {e}")
else:
print("[4/7] Vector enrichment skipped (use include_vectors=True)")
# Phase 5: Normalize link weights
print("[5/7] Normalizing link weights...")
raw_links = [GraphLink(**l) if isinstance(l, dict) else l for l in pipeline._links]
normalize_link_weights(raw_links)
# Phase 6: Validate
print("[6/7] Validating graph...")
validation = validate_graph(graph)
if not validation["valid"]:
print(f"[WARN] Validation issues: {validation['issues']}")
# Phase 7: Export all formats
print("[7/7] Exporting to all formats...")
d3_path = export_d3(graph, str(output_dir / "graph_data_v7.json"))
neo4j_nodes, neo4j_edges = export_neo4j(graph, str(output_dir / "neo4j_export"))
graphml_path = export_graphml(graph, str(output_dir / "litigation_graph.graphml"))
adj_path = export_adjacency_list(graph, str(output_dir / "graph_adj.json"))
elapsed = time.perf_counter() - start
report = {
"status": "SUCCESS",
"elapsed_seconds": round(elapsed, 2),
"nodes": graph["meta"]["node_count"],
"links": graph["meta"]["link_count"],
"quality": quality_report,
"validation": validation,
"exports": {
"d3": str(d3_path),
"neo4j_nodes": str(neo4j_nodes),
"neo4j_edges": str(neo4j_edges),
"graphml": str(graphml_path),
"adjacency": str(adj_path),
},
}
print(f"\n{'='*60}")
print(f"THEMANBEARPIG BUILD COMPLETE")
print(f" Nodes: {report['nodes']:,}")
print(f" Links: {report['links']:,}")
print(f" Time: {report['elapsed_seconds']}s")
print(f" Quality: {'PASS' if quality_report['pass'] else 'WARN'}")
print(f" Valid: {'PASS' if validation['valid'] else 'WARN'}")
print(f"{'='*60}")
return report
# Entry point
if __name__ == "__main__":
build_themanbearpig(include_vectors=False)
| Source Table | Node Type | Layer | Key Columns | Link Types Generated |
|---|---|---|---|---|
| evidence_quotes | EVIDENCE | 0 | quote_text, actor, target_entity, lane, confidence | MENTIONS, TARGETS |
| authority_chains_v2 | AUTHORITY | 1 | primary_citation, supporting_citation, relationship | CITES |
| timeline_events | EVENT | 2 | event_date, event_description, actor, lane | MENTIONS, PRECEDES |
| impeachment_matrix | IMPEACHMENT | 3 | evidence_summary, impeachment_value, target | IMPEACHES |
| contradiction_map | EVIDENCE | 4 | source_a, source_b, contradiction_text, severity | CONTRADICTS |
| judicial_violations | WEAPON | 5 | violation_type, description, judge, severity | VIOLATES |
| berry_mcneill_intelligence | CARTEL | 6 | person_a, person_b, connection_type | CONNECTED_TO |
| police_reports | POLICE | 7 | case_number, report_date, officer, summary | FILED_BY |
| filing_readiness | FILING | 8 | vehicle_name, status, confidence, lane | SUPPORTS |
| michigan_rules_extracted | RULE | 9 | citation, title, rule_text | — |
| claims | CLAIM | 10 | claim_name, status, vehicle_name, lane | SUPPORTS |
| master_citations | AUTHORITY | 11 | citation | — |
| convergence_domains | CONVERGENCE | 12 | domain_name, status, category_name, wave_id | — |
| Operation | SQLite | DuckDB | Speedup | |---|---|---|---| | COUNT(*) across 5 tables | ~800ms | ~15ms | 53× | | GROUP BY actor with 175K rows | ~2.1s | ~45ms | 47× | | Window functions (LAG/LEAD) | ~3.5s | ~80ms | 44× | | Cross-table JOIN (3 tables) | ~5.2s | ~120ms | 43× | | PIVOT convergence domains | ~1.8s | ~25ms | 72× | | Full graph build (all tables) | ~45s | ~8s (hybrid) | 5.6× |
| Layer | Table Source | Est. Nodes (sampled) | Est. Links | |---|---|---|---| | 0 Evidence | evidence_quotes | 5,000 | 8,000 | | 1 Authority | authority_chains_v2 | 2,000 | 10,000 | | 2 Timeline | timeline_events | 5,000 | 5,000 | | 3 Impeachment | impeachment_matrix | 3,000 | 3,000 | | 4 Contradictions | contradiction_map | 2,500 | 5,000 | | 5 Violations | judicial_violations | 2,000 | 2,000 | | 6 Cartel | berry_mcneill_intelligence | 500 | 1,000 | | 7 Police | police_reports | 356 | 356 | | 8 Filings | filing_readiness | 100 | 500 | | 9 Rules | michigan_rules_extracted | 2,000 | — | | 10 Claims | claims | 500 | 500 | | 11 Citations | master_citations | 1,000 | — | | 12 Convergence | convergence_domains | 200 | — | | TOTAL | | ~24,156 | ~35,356 |
Full tables are too large for interactive visualization (175K evidence nodes would crash D3). Sampling strategy:
| Strategy | When | Implementation |
|---|---|---|
| Top-N by severity | WEAPON, IMPEACHMENT | ORDER BY severity DESC LIMIT 3000 |
| Top-N by confidence | EVIDENCE | ORDER BY confidence DESC LIMIT 5000 |
| Top-N by ref_count | AUTHORITY | GROUP BY citation ORDER BY COUNT(*) DESC LIMIT 2000 |
| All rows | POLICE, FILING, CONVERGENCE | Tables are small enough (< 500 rows) |
| Temporal sampling | EVENT | Top 5000 by date spread |
| Semantic clustering | All (optional) | LanceDB similarity → representative samples |
For deep-dive views, the user can zoom into a subgraph and DATAWEAVE will fetch unsampled detail on demand via extract_subgraph().
Production schemas WILL differ from DDL. DATAWEAVE handles this with adaptive column selection:
# Pattern: check → adapt → query
cols = verify_schema(conn, "evidence_quotes")
actor_col = '"actor"' if "actor" in cols else '"source_actor"' if "source_actor" in cols else "NULL"
sql = f"SELECT rowid, {actor_col} AS actor FROM evidence_quotes LIMIT 10"
Every transform function in DATAWEAVE follows this pattern. Never hardcode column names without verification.
tools
Transcendent system design and engine architecture for LitigationOS. Use when: designing engines, Go concurrent systems, Rust CLI tools, performance optimization, clean code practices, SOLID principles, architecture decisions, system design patterns, engine fleet management, daemon architecture, connection pooling, thread safety, error recovery, circuit breakers, graceful degradation.
documentation
Transcendent litigation warfare system for LitigationOS. Use when: evidence hunting, adversary profiling, custody analysis, MCL 722.23 factors, impeachment prep, contradiction detection, deadline tracking, docket management, case operations, best interest analysis, parental alienation documentation, false allegation rebuttal, credibility assessment, witness preparation, deposition strategy.
development
Transcendent judicial intelligence and misconduct documentation system. Use when: judge profiling, McNeill analysis, Hoopes analysis, misconduct patterns, bias indicators, ex parte violations, JTC complaints, judicial violation tracking, benchbook deviations, canon violations, ruling pattern analysis, cartel intelligence, Berry-McNeill connections, recusal grounds, DuckDB analytics on judicial data.
devops
Transcendent data engineering and database mastery for LitigationOS. Use when: SQL queries, DuckDB analytics, LanceDB vectors, Polars DataFrames, FTS5 search, RAG pipelines, SQLite optimization, schema design, data migration, cross-database federation, vector embeddings, semantic search, indexing strategy, query optimization, connection pooling, WAL mode, PRAGMA tuning, batch operations.