skills/ai-detecting-anomalies/SKILL.md
Detect fraud, unusual behavior, and anomalies in events or transactions using AI. Use when detecting fraud, flagging suspicious transactions, anomaly detection in logs, spotting unusual user behavior, abuse detection, identifying outliers in data, suspicious activity monitoring, fraud scoring, unusual pattern detection, flagging account takeover attempts, detecting bot traffic, abnormal usage patterns, security event triage, risk scoring with AI.
npx skillsauth add lebsral/dspy-programming-not-prompting-lms-skills ai-detecting-anomaliesInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Build an AI anomaly detector with DSPy - define what normal looks like, score events for severity, route by risk level, and explain findings to human reviewers.
Ask the user:
The answers determine severity thresholds, routing logic, and how much baseline context to include.
Summarize historical "normal" behavior into a compact string the LM can reason against. A good baseline summary describes typical patterns, ranges, and context.
import dspy
lm = dspy.LM("openai/gpt-4o-mini") # or "anthropic/claude-sonnet-4-5-20250929", etc.
dspy.configure(lm=lm)
class SummarizeBaseline(dspy.Signature):
"""Summarize the normal behavior pattern from historical event data.
Produce a concise description of what typical events look like - include
typical values, frequencies, time patterns, and common attributes."""
historical_events: str = dspy.InputField(
desc="Sample of recent normal events, formatted as JSON or CSV"
)
baseline_summary: str = dspy.OutputField(
desc="Concise description of normal behavior - typical ranges, patterns, and context"
)
baseline_summarizer = dspy.ChainOfThought(SummarizeBaseline)
For many use cases, the baseline summary can be constructed once per period (daily, hourly) and cached rather than recomputed per event.
The core signature takes an event and the baseline summary, and outputs a severity level with a specific explanation.
from typing import Literal
SEVERITY_LEVELS = ["normal", "low", "medium", "high", "critical"]
class ScoreAnomaly(dspy.Signature):
"""Analyze an event against the normal baseline and determine if it is anomalous.
Consider all dimensions - amount, timing, location, frequency, sequence, and context.
Severity reflects both how unusual the event is AND the potential impact if malicious."""
event: str = dspy.InputField(
desc="The event to analyze, as a JSON object or structured text"
)
baseline_summary: str = dspy.InputField(
desc="Description of normal behavior for this type of event"
)
severity: Literal[tuple(SEVERITY_LEVELS)] = dspy.OutputField(
desc="Anomaly severity - normal (expected), low (minor deviation), "
"medium (notable deviation), high (strong anomaly signal), "
"critical (likely fraud or attack, needs immediate action)"
)
explanation: str = dspy.OutputField(
desc="Specific explanation citing concrete details - what exactly is unusual, "
"how it deviates from the baseline, and what the risk is. "
"Not vague ('looks suspicious') but specific ('transaction amount $4,800 "
"is 12x the users 30-day average of $400, combined with a new device "
"and 3am local time')."
)
anomaly_score: float = dspy.OutputField(
desc="Confidence score from 0.0 (clearly normal) to 1.0 (clearly anomalous)"
)
anomaly_scorer = dspy.ChainOfThought(ScoreAnomaly)
Combine baseline construction and anomaly scoring into a single reusable module.
class AnomalyDetector(dspy.Module):
def __init__(self):
self.baseline_summarizer = dspy.ChainOfThought(SummarizeBaseline)
self.scorer = dspy.ChainOfThought(ScoreAnomaly)
self._cached_baseline = None
def set_baseline(self, historical_events: str):
"""Pre-compute the baseline summary from historical data."""
result = self.baseline_summarizer(historical_events=historical_events)
self._cached_baseline = result.baseline_summary
return self._cached_baseline
def forward(self, event: str, baseline_summary: str = None):
baseline = baseline_summary or self._cached_baseline
if not baseline:
raise ValueError("No baseline set. Call set_baseline() first or pass baseline_summary.")
return self.scorer(event=event, baseline_summary=baseline)
detector = AnomalyDetector()
# Set baseline once from recent history
detector.set_baseline("""
Recent 30-day transactions:
- Average amount: $412, std dev: $180, max: $1,200
- Typical locations: New York, Chicago, LA
- Typical hours: 9am-9pm local time
- Devices: 1-2 known devices per user
- Frequency: 3-8 transactions/week per user
""")
result = detector(event='{"amount": 4800, "location": "Lagos", "hour": 3, "device": "unknown"}')
print(f"Severity: {result.severity}")
print(f"Score: {result.anomaly_score:.2f}")
print(f"Explanation: {result.explanation}")
Route events automatically based on severity, and escalate only when confidence is high enough.
def route_anomaly(result) -> dict:
"""Route a scored anomaly to the appropriate action."""
routing = {
"normal": {"action": "dismiss", "notify": False, "block": False},
"low": {"action": "log", "notify": False, "block": False},
"medium": {"action": "queue", "notify": True, "block": False},
"high": {"action": "alert", "notify": True, "block": False},
"critical": {"action": "escalate", "notify": True, "block": True},
}
route = routing[result.severity]
# Downgrade routing if confidence is low
if result.anomaly_score < 0.6 and result.severity in ("high", "critical"):
route = routing["medium"] # reduce to queue for human review
route["confidence_downgraded"] = True
return {**route, "severity": result.severity, "score": result.anomaly_score}
| Severity | Score range | Default action | Blocks transaction | |----------|-------------|----------------|--------------------| | normal | 0.0 - 0.2 | Dismiss silently | No | | low | 0.2 - 0.4 | Log for review | No | | medium | 0.4 - 0.6 | Queue for analyst | No | | high | 0.6 - 0.8 | Alert on-call | No | | critical | 0.8 - 1.0 | Escalate + block | Yes |
Adjust these thresholds based on your false-positive tolerance.
Explanations are only useful if they are specific. Force the LM to cite concrete numbers and deviations by adding a dedicated explanation signature for high-severity events.
class ExplainAnomaly(dspy.Signature):
"""Generate a reviewer-ready explanation of why this event is anomalous.
Write for a human analyst who needs to decide quickly. Cite specific numbers,
list each anomalous dimension separately, and state the risk clearly."""
event: str = dspy.InputField(desc="The flagged event")
baseline_summary: str = dspy.InputField(desc="Normal behavior baseline")
severity: str = dspy.InputField(desc="Assigned severity level")
reviewer_explanation: str = dspy.OutputField(
desc="Bullet-point explanation for human reviewer: what deviates, by how much, "
"and what action is recommended. Must cite specific values from the event."
)
explainer = dspy.ChainOfThought(ExplainAnomaly)
# Only invoke for high/critical — saves cost
if result.severity in ("high", "critical"):
detail = explainer(
event=event,
baseline_summary=baseline,
severity=result.severity,
)
print(detail.reviewer_explanation)
Batch multiple events together (same user, same time window) before scoring to give the LM session-level context.
import json
class ScoreSession(dspy.Signature):
"""Analyze a sequence of events from the same user session for anomalies.
Consider the pattern across events, not just individual events in isolation.
Rapid-fire actions, escalating amounts, or device switches mid-session
are signals invisible when events are scored individually."""
session_events: str = dspy.InputField(
desc="JSON array of events from the same user/session, in chronological order"
)
baseline_summary: str = dspy.InputField(desc="Normal behavior baseline for this user")
severity: Literal[tuple(SEVERITY_LEVELS)] = dspy.OutputField()
explanation: str = dspy.OutputField(
desc="What pattern across the session is anomalous, not just individual events"
)
anomaly_score: float = dspy.OutputField()
session_scorer = dspy.ChainOfThought(ScoreSession)
def score_user_window(events: list[dict], baseline: str, window_minutes: int = 15):
"""Group events into time windows and score as sessions."""
# Sort by timestamp, group into windows
events_json = json.dumps(events, indent=2)
return session_scorer(session_events=events_json, baseline_summary=baseline)
from dspy.evaluate import Evaluate
# Build a labeled dataset - events with known ground truth
# label: 0 = normal, 1 = anomalous
labeled_events = [
dspy.Example(
event='{"amount": 4800, "location": "Lagos", "hour": 3, "device": "unknown"}',
baseline_summary="Avg $400, NY/Chicago, 9am-9pm, known devices",
severity="critical",
).with_inputs("event", "baseline_summary"),
dspy.Example(
event='{"amount": 380, "location": "New York", "hour": 14, "device": "iPhone-known"}',
baseline_summary="Avg $400, NY/Chicago, 9am-9pm, known devices",
severity="normal",
).with_inputs("event", "baseline_summary"),
# ... more examples
]
trainset = labeled_events[:int(len(labeled_events) * 0.8)]
devset = labeled_events[int(len(labeled_events) * 0.8):]
def anomaly_metric(example, prediction, trace=None):
"""Measure exact severity match, or partial credit for adjacent severities."""
if prediction.severity == example.severity:
return 1.0
# Adjacent severity is a partial match (e.g., predicted high vs actual critical)
order = {s: i for i, s in enumerate(SEVERITY_LEVELS)}
distance = abs(order[prediction.severity] - order[example.severity])
return max(0.0, 1.0 - distance * 0.3)
evaluator = Evaluate(
devset=devset,
metric=anomaly_metric,
num_threads=4,
display_progress=True,
display_table=5,
)
score = evaluator(anomaly_scorer)
print(f"Baseline accuracy: {score:.1f}%")
optimizer = dspy.BootstrapFewShot(
metric=anomaly_metric,
max_bootstrapped_demos=4,
)
optimized_detector = optimizer.compile(anomaly_scorer, trainset=trainset)
# Re-evaluate
score = evaluator(optimized_detector)
print(f"Optimized accuracy: {score:.1f}%")
# Save
optimized_detector.save("anomaly_scorer.json")
def false_positive_rate(examples, predictions):
"""Compute FPR - normal events flagged as anomalous."""
normals = [(e, p) for e, p in zip(examples, predictions) if e.severity == "normal"]
if not normals:
return 0.0
flagged = sum(1 for _, p in normals if p.severity != "normal")
return flagged / len(normals)
| Pattern | When to use | |---------|-------------| | Single event + baseline | Simple fraud scoring, log triage | | Session window scoring | Account takeover, multi-step attacks | | Two-stage (pre-filter + LM) | High-volume streams - rule-based pre-filter, LM on candidates | | Cached baseline | Baselines are stable - build once per period, reuse | | Explanation-on-demand | Cost savings - only generate detailed explanations for high/critical |
baseline_summary in the signature. Without it, the model has no reference point and defaults to flagging anything non-trivial.Literal with five graduated severity levels (normal, low, medium, high, critical) so downstream routing can take proportional action.dspy.Assert/dspy.Suggest for severity validation — use dspy.Refine with a reward function that checks severity is one of the valid values and the explanation cites specific numbers.desc on the explanation field requiring concrete deviations, specific values, and a risk statement.Install any skill:
npx skills add lebsral/DSPy-Programming-not-prompting-LMs-skills --skill <name>
/ai-scoring/ai-improving-accuracy/ai-generating-data/dspy-chain-of-thought/dspy-refine/dspy-best-of-n/ai-do if you do not have it — it routes any AI problem to the right skill and is the fastest way to work: npx skills add lebsral/DSPy-Programming-not-prompting-LMs-skills --skill ai-dotools
See what is happening during optimizer.compile() instead of waiting blind. Use when you want to watch optimization progress, see scores as they come in, know if your optimizer is working, check if optimization is stuck, understand why optimization is taking too long, get live progress during compile, monitor convergence, detect overfitting during optimization, interpret optimization results, or pick the right tool for watching optimization. Also used for optimizer progress bar, is my optimizer doing anything, optimization seems stuck, how long will optimization take, watch GEPA run, watch MIPROv2 run, live optimization dashboard, optimizer not improving, scores not going up, optimization taking forever, see what optimizer is doing, debug slow optimization, optimization visibility, optimizer metrics, track compile progress, optimization observability.
testing
Use when you want the highest-quality prompt optimization DSPy offers — jointly optimizes instructions and few-shot demos, with auto=light/medium/heavy presets. Common scenarios - you want the best possible accuracy from prompt optimization, jointly tuning instructions and few-shot demonstrations, using auto presets for different compute budgets, or when COPRO or BootstrapFewShot alone are not reaching your accuracy target. Related - ai-improving-accuracy, dspy-copro, dspy-bootstrap-few-shot. Also used for dspy.MIPROv2, best DSPy optimizer, highest quality optimization, auto=light medium heavy, joint instruction and demo optimization, most powerful prompt optimizer, MIPROv2 vs COPRO vs BootstrapFewShot, which optimizer should I use, state of the art prompt optimization, when to use MIPROv2, optimize both instructions and examples, heavy optimization for production, best optimizer for accuracy.
testing
Use LangWatch for DSPy auto-tracing and real-time optimizer progress. Use when you want to set up LangWatch, langwatch.dspy.init, auto-tracing DSPy, real-time optimization dashboard, optimizer progress tracking, app.langwatch.ai, or DSPy optimizer dashboard. Also used for langwatch setup, pip install langwatch, langwatch trace, optimizer progress, real-time optimization, watch optimizer run, LangWatch self-hosted, langwatch docker, langwatch vs langtrace, langwatch autotrack_dspy.
data-ai
Use when you want to optimize instructions without few-shot examples — a lightweight alternative to COPRO when you do not have or do not want to use demonstrations. Common scenarios - optimizing instructions when you do not have or do not want to use few-shot demonstrations, lightweight instruction search as a first step, tasks where examples in the prompt confuse the model, or when you want fast instruction optimization without the cost of COPRO. Related - ai-improving-accuracy, dspy-copro, dspy-miprov2. Also used for dspy.GEPA, instruction optimization without demos, lightweight prompt optimization, optimize instructions only, no few-shot examples needed, GEPA vs COPRO, quick instruction search, when demonstrations hurt performance, zero-shot optimization, instruction-only optimizer, simplest instruction tuner, fast prompt optimization, skip few-shot and just tune instructions, optimize Pydantic field descriptions, GEPA structured output, GEPA does not optimize field desc.