.fleet/context/skills/dspy-agent-framework-integration/SKILL.md
Comprehensive guide to integrating DSPy with Microsoft Agent Framework in AgenticFleet, covering typed signatures, assertions, routing cache, GEPA optimization, and agent handoffs.
npx skillsauth add qredence/agentic-fleet dspy-agent-framework-integrationInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
A comprehensive guide to the integration patterns between DSPy and Microsoft Agent Framework in AgenticFleet. This skill documents how to leverage DSPy's structured reasoning capabilities with the Agent Framework's orchestration primitives.
AgenticFleet combines DSPy for intelligent prompt optimization and structured outputs with Microsoft Agent Framework for reliable multi-agent orchestration. This integration enables:
┌─────────────────────────────────────────────────────────────────┐
│ AgenticFleet Integration │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ DSPyReasoner│────►│ AgentFactory│────►│ ChatAgent │ │
│ │ (Signatures)│ │ (YAML Config) │ (Enhanced) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ┌──────▼───────────────────▼───────────────────▼──────┐ │
│ │ Microsoft Agent Framework │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────────────┐ │ │
│ │ │ Workflow │ │AgentThread│ │CheckpointStorage │ │ │
│ │ └──────────┘ └──────────┘ └──────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
All DSPy signatures in AgenticFleet use Pydantic models for structured outputs:
# src/agentic_fleet/dspy_modules/signatures.py
import dspy
from pydantic import BaseModel, Field
from typing import Literal
class TaskAnalysis(dspy.Signature):
"""Analyze a task with structured output."""
task: str = dspy.InputField(desc="The user's task description")
analysis: TaskAnalysisOutput = dspy.OutputField(
desc="Structured analysis of the task"
)
class TaskAnalysisOutput(BaseModel):
"""Pydantic model for typed signature output."""
complexity: Literal["low", "medium", "high"] = Field(
description="Estimated task complexity"
)
required_capabilities: list[str] = Field(
description="List of required capabilities"
)
estimated_steps: int = Field(ge=1, le=50)
preferred_tools: list[str] = Field(default_factory=list)
needs_web_search: bool = Field(description="Whether web search needed")
reasoning: str = Field(description="Reasoning behind analysis")
# src/agentic_fleet/dspy_modules/reasoner.py
from dspy import TypedPredictor
class DSPyReasoner(dspy.Module):
def __init__(self):
super().__init__()
self.analyzer = TypedPredictor(TaskAnalysis)
def analyze(self, task: str) -> TaskAnalysisOutput:
result = self.analyzer(task=task)
return result.analysis
Normalize inputs with Pydantic validators:
class RoutingDecisionOutput(BaseModel):
assigned_to: list[str] = Field(min_length=1)
execution_mode: Literal["delegated", "sequential", "parallel"]
@field_validator("assigned_to", mode="before")
@classmethod
def normalize_agents(cls, v: str | list[str]) -> list[str]:
if isinstance(v, str):
return [a.strip() for a in v.split(",") if a.strip()]
return v
@field_validator("execution_mode", mode="before")
@classmethod
def normalize_mode(cls, v: str) -> str:
mapping = {
"delegate": "delegated",
"single": "delegated",
"sequence": "sequential",
"concurrent": "parallel",
}
return mapping.get(v.strip().lower(), v)
DSPy 3.x provides two assertion types for routing validation:
# src/agentic_fleet/dspy_modules/assertions.py
import dspy
# Hard constraint: causes backtracking on failure
dspy.Assert(condition, "error message")
# Soft constraint: guides optimization without failure
dspy.Suggest(condition, "guidance message")
def validate_agent_exists(
assigned_agents: list[str],
available_agents: list[str]
) -> bool:
"""Check all assigned agents exist in available pool."""
# Hard constraint: must assign at least one agent
Assert(len(assigned_agents) > 0, "Must assign at least one agent")
# Soft suggestion: prefer matching case
for agent in assigned_agents:
Assert(
agent.lower() in [a.lower() for a in available_agents],
f"Agent '{agent}' not in available pool"
)
return True
def validate_execution_mode(
assigned_agents: list[str],
execution_mode: str
) -> bool:
"""Ensure execution mode matches agent count."""
if len(assigned_agents) > 1 and execution_mode == "delegated":
Suggest(
len(assigned_agents) == 1,
"Consider using 'parallel' for multiple agents"
)
return True
class TaskRouting(dspy.Signature):
task: str = dspy.InputField(desc="The task to route")
team: str = dspy.InputField(desc="Available agents")
context: str = dspy.InputField(desc="Execution context")
decision: RoutingDecisionOutput = dspy.OutputField()
def __call__(self, task, team, context):
# Extract agent names from team description
available_agents = extract_agent_names(team)
# Validate before finalizing
result = super().__call__(task=task, team=team, context=context)
# Validate routing decision
validate_agent_exists(result.decision.assigned_to, available_agents)
validate_execution_mode(
result.decision.assigned_to,
result.decision.execution_mode
)
return result
# src/agentic_fleet/agents/base.py
from agent_framework._agents import ChatAgent
import dspy
class DSPyEnhancedAgent(ChatAgent):
def __init__(
self,
name: str,
chat_client,
instructions: str = "",
enable_dspy: bool = True,
reasoning_strategy: str = "chain_of_thought",
**kwargs
):
super().__init__(
name=name,
instructions=instructions,
chat_client=chat_client,
**kwargs
)
self.enable_dspy = enable_dspy
self.reasoning_strategy = reasoning_strategy
# Initialize reasoning modules
if enable_dspy:
self._init_reasoning_modules()
def _init_reasoning_modules(self):
"""Initialize DSPy reasoning strategies."""
if self.reasoning_strategy == "react":
self.react_module = dspy.ReAct(
"question -> answer",
tools=self.tools
)
elif self.reasoning_strategy == "program_of_thought":
self.pot_module = dspy.ProgramOfThought("question -> answer")
elif self.reasoning_strategy == "chain_of_thought":
self.cot_module = dspy.ChainOfThought("question -> answer")
class DSPyEnhancedAgent(ChatAgent):
def _enhance_task_with_dspy(self, task: str, context: str = "") -> str:
"""Enhance task using DSPy reasoning."""
if not self.enable_dspy:
return task
# Use Chain of Thought for complex tasks
enhancer = dspy.ChainOfThought("task, context -> enhanced_task")
result = enhancer(
task=task,
context=context or "No prior context"
)
return result.enhanced_task
async def run(self, message, **kwargs):
# Enhance task before execution
enhanced_message = self._enhance_task_with_dspy(
message,
kwargs.get("context", "")
)
# Run with enhanced task
return await super().run(enhanced_message, **kwargs)
# src/agentic_fleet/dspy_modules/reasoner_cache.py
import time
from typing import Any
from collections import OrderedDict
class RoutingCache:
"""TTL-based cache for routing decisions."""
def __init__(self, ttl_seconds: int = 300, max_size: int = 1024):
self.ttl = ttl_seconds
self.max_size = max_size
self._cache: OrderedDict[str, tuple[Any, float]] = OrderedDict()
def get(self, key: str) -> Any | None:
"""Get cached value if not expired."""
if key not in self._cache:
return None
value, timestamp = self._cache[key]
# Check TTL
if time.time() - timestamp > self.ttl:
del self._cache[key]
return None
# Move to end (LRU)
self._cache.move_to_end(key)
return value
def set(self, key: str, value: Any) -> None:
"""Cache value with current timestamp."""
# Evict oldest if at capacity
if len(self._cache) >= self.max_size:
self._cache.popitem(last=False)
self._cache[key] = (value, time.time())
def clear(self) -> None:
"""Clear all cached entries."""
self._cache.clear()
# src/agentic_fleet/dspy_modules/reasoner.py
class DSPyReasoner(dspy.Module):
def __init__(self, enable_routing_cache: bool = True, **kwargs):
super().__init__()
self.enable_routing_cache = enable_routing_cache
self._routing_cache = RoutingCache(
ttl_seconds=kwargs.get("cache_ttl_seconds", 300),
max_size=kwargs.get("cache_max_entries", 1024)
)
def _generate_cache_key(
self,
task: str,
team: str,
context: str
) -> str:
"""Generate cache key from routing inputs."""
import hashlib
content = f"{task}:{team}:{context}"
return hashlib.md5(content.encode()).hexdigest()
def route(self, task: str, team: str, context: str) -> RoutingDecisionOutput:
# Check cache first
if self.enable_routing_cache:
cache_key = self._generate_cache_key(task, team, context)
cached = self._routing_cache.get(cache_key)
if cached:
return cached
# Execute routing
result = self.router(task=task, team=team, context=context)
decision = result.decision
# Cache result
if self.enable_routing_cache:
self._routing_cache.set(cache_key, decision)
return decision
# src/agentic_fleet/config/workflow_config.yaml
dspy:
optimization:
enabled: true
examples_path: src/agentic_fleet/data/supervisor_examples.json
use_gepa: true
gepa_auto: light # light|medium|heavy
gepa_reflection_model: gpt-5-mini
gepa_history_min_quality: 8.0
gepa_history_limit: 200
gepa_val_split: 0.2
gepa_seed: 13
gepa_log_dir: .var/logs/dspy/gepa
# Run GEPA optimization
agentic-fleet optimize
# Output: .var/cache/dspy/compiled_reasoner.json
# src/agentic_fleet/dspy_modules/reasoner.py
def _load_compiled_module(self) -> None:
"""Load optimized prompt weights from disk."""
compiled_path = get_configured_compiled_reasoner_path()
meta_path = Path(f"{compiled_path}.meta")
if compiled_path.exists():
# Verify source hash matches
if meta_path.exists():
meta = json.loads(meta_path.read_text())
expected_hash = meta.get("reasoner_source_hash")
if expected_hash != get_reasoner_source_hash():
logger.info("Compiled reasoner ignored (source hash mismatch)")
return
logger.info(f"Loading compiled reasoner from {compiled_path}")
self.load(str(compiled_path))
# src/agentic_fleet/agents/coordinator.py
from agent_framework._agents import ChatAgent
class AgentFactory:
def create_agent(self, name: str, config: dict) -> ChatAgent:
"""Create ChatAgent from YAML configuration."""
model_id = config.get("model")
instructions = self._resolve_instructions(config.get("instructions", ""))
tools = self._resolve_tools(config.get("tools", []))
return ChatAgent(
name=name,
description=config.get("description", ""),
instructions=instructions,
chat_client=self._create_chat_client(model_id),
tools=tools
)
def _resolve_instructions(self, instructions_ref: str) -> str:
"""Resolve dynamic prompts or static references."""
if instructions_ref.startswith("prompts."):
# Dynamic DSPy prompt generation
return self._generate_dynamic_prompt(instructions_ref)
# Static prompt lookup
return get_static_prompt(instructions_ref)
# src/agentic_fleet/agents/coordinator.py
from dspy import ChainOfThought
from agentic_fleet.dspy_modules.signatures import PlannerInstructionSignature
class AgentFactory:
def __init__(self):
self.instruction_generator = ChainOfThought(PlannerInstructionSignature)
def _generate_dynamic_prompt(self, ref: str) -> str:
"""Generate prompt using DSPy."""
if ref == "prompts.planner":
result = self.instruction_generator(
available_agents=self._get_agent_descriptions(),
task_goals="Plan and coordinate multi-agent workflows"
)
return result.instructions
return ""
# src/agentic_fleet/workflows/supervisor.py
from agent_framework._workflows import (
WorkflowStartedEvent,
WorkflowStatusEvent,
WorkflowOutputEvent,
ExecutorCompletedEvent,
RequestInfoEvent, # HITL support
FileCheckpointStorage
)
class SupervisorWorkflow:
def __init__(self, context, checkpoint_dir: str = ".var/checkpoints"):
self.context = context
self.checkpoint_storage = FileCheckpointStorage(checkpoint_dir)
async def run_stream(self, task: str, checkpoint_id: str | None = None):
"""Run workflow with optional checkpoint resume."""
if checkpoint_id:
# Resume from checkpoint
await self._resume_from_checkpoint(checkpoint_id)
else:
# Start fresh
async for event in self._execute_pipeline(task):
yield event
async def _resume_from_checkpoint(self, checkpoint_id: str):
"""Resume workflow execution from checkpoint."""
state = self.checkpoint_storage.load(checkpoint_id)
# Restore workflow state
self.context.restore_from_state(state)
# Continue execution
async for event in self._continue_pipeline():
yield event
# src/agentic_fleet/workflows/strategies.py
from agent_framework._agents import ChatAgent
class HandoffManager:
"""Manage agent-to-agent transfers with context preservation."""
def __init__(self):
self._handoff_history: list[dict] = []
def prepare_handoff(
self,
from_agent: ChatAgent,
to_agent: ChatAgent,
context: dict
) -> dict:
"""Prepare handoff input with accumulated context."""
handoff_input = {
"task": context.get("original_task"),
"findings": context.get("findings", []),
"decisions": context.get("decisions", []),
"remaining_work": context.get("remaining_work", []),
"from_agent_summary": self._summarize_agent_work(from_agent)
}
self._handoff_history.append({
"from": from_agent.name,
"to": to_agent.name,
"input": handoff_input
})
return handoff_input
def execute_sequential_with_handoffs(
self,
agents: list[ChatAgent],
tasks: list[str]
) -> list[dict]:
"""Execute tasks with agent handoffs."""
context = {"original_task": tasks[0], "findings": [], "decisions": []}
results = []
for i, (agent, task) in enumerate(zip(agents, tasks)):
context["remaining_work"] = tasks[i + 1:]
handoff_input = self.prepare_handoff(
from_agent=agents[i - 1] if i > 0 else None,
to_agent=agent,
context=context
)
result = self._run_agent_with_context(agent, task, handoff_input)
context["findings"].extend(result.get("findings", []))
context["decisions"].extend(result.get("decisions", []))
results.append(result)
return results
# DSPy Configuration
dspy:
model: gpt-5-mini
routing_model: gpt-5-mini
use_typed_signatures: true
enable_routing_cache: true
routing_cache_ttl_seconds: 300
require_compiled: false # true in production
# Dynamic Prompts
dynamic_prompts:
enabled: true
signatures_path: src/agentic_fleet/dspy_modules/signatures.py
# GEPA Optimization
optimization:
enabled: true
use_gepa: true
gepa_auto: light
# Workflow Configuration
workflow:
supervisor:
max_rounds: 15
enable_streaming: true
checkpointing:
checkpoint_dir: .var/checkpoints
# Agent Configuration
agents:
researcher:
model: gpt-4.1-mini
tools: [TavilySearchTool]
reasoning:
effort: medium
verbosity: normal
# src/agentic_fleet/workflows/helpers.py
def is_simple_task(task: str) -> bool:
"""Check if task qualifies for fast-path processing."""
simple_patterns = [
r"^(hi|hello|hey|how are you|what's up)",
r"^\d+\s*[\+\-\*/]\s*\d+$", # Simple math
r"^(what is|who is|where is|when did)\s+\w+", # Simple facts
]
return any(re.match(p, task.lower()) for p in simple_patterns)
# src/agentic_fleet/workflows/strategies.py
async def execute_parallel(
agents: list[ChatAgent],
task: str
) -> list[dict]:
"""Execute task across multiple agents concurrently."""
async def run_agent(agent):
return {
"agent": agent.name,
"result": await agent.run(task)
}
results = await asyncio.gather(*[run_agent(a) for a in agents])
return results
# src/agentic_fleet/workflows/executors.py
async def run_quality_phase(
task: str,
result: str,
threshold: float = 7.0
) -> tuple[str, bool]:
"""Evaluate quality and refine if needed."""
assessment = await self.reasoner.assess_quality(task, result)
if assessment.score < threshold:
# Refine the result
refined = await self._refine_result(task, result, assessment.feedback)
return refined, True
return result, False
.var/logs/execution_history.jsonl for routing decisionsgepa_max_metric_calls in configmake type-check before commitsdocs/guides/dspy-agent-framework-integration.mdtools
Analyze the current session and consolidate learnings. Use at the end of a session or task.
devops
Semantic search for memory. Use to find solutions, patterns, or context from Chroma Cloud.
documentation
Ingest new procedural memory (skills, patterns, docs) into the vector database.
testing
Initialize or hydrate the agent's memory system and verify configuration.