skills/moai-alfred-workflow/SKILL.md
Enterprise multi-agent workflow orchestration specialist. Master workflow design, agent coordination, task delegation, and process automation with Context7 MCP integration and comprehensive monitoring. Build scalable, intelligent workflow systems with fault tolerance and performance optimization.
npx skillsauth add ajbcoding/claude-skill-eval moai-alfred-workflowInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
# Basic workflow setup
from alfred_workflow import WorkflowEngine, Agent
engine = WorkflowEngine()
spec_agent = Agent("spec-builder", domain="requirements")
impl_agent = Agent("tdd-implementer", domain="development")
test_agent = Agent("quality-gate", domain="testing")
workflow = engine.create_workflow("feature_development")
workflow.add_stage("specification", spec_agent)
workflow.add_stage("implementation", impl_agent, depends_on=["specification"])
workflow.add_stage("testing", test_agent, depends_on=["implementation"])
result = engine.execute(workflow, input_data={"feature": "user auth"})
# Context7 integration
from alfred_workflow import Context7Integration
context7 = Context7Integration()
examples = context7.search_code_examples(
query="react authentication", language="javascript", framework="react"
)
best_practices = context7.get_best_practices(
topic="database optimization", database="postgresql"
)
| Pattern | Use Case | Benefit | |---------|----------|---------| | Sequential | Linear tasks | Predictable flow | | Parallel | Independent tasks | Faster completion | | Conditional | Decision-based | Adaptive workflows | | Error Recovery | Fault tolerance | Reliable execution |
WorkflowEngine: Central orchestrator managing agents and workflows Agent System: Specialized agents for different domains (spec-builder, tdd-implementer, quality-gate) Task Model: Structured tasks with dependencies, priorities, and retry logic Template System: Reusable workflow patterns (feature development, bug fix)
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
from typing import Dict, List, Any, Optional
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class AgentStatus(Enum):
IDLE = "idle"
BUSY = "busy"
ERROR = "error"
@dataclass
class Task:
id: str
name: str
description: str
agent_type: str
input_data: Dict[str, Any] = field(default_factory=dict)
dependencies: List[str] = field(default_factory=list)
status: TaskStatus = TaskStatus.PENDING
retry_count: int = 0
max_retries: int = 3
result: Optional[Dict[str, Any]] = None
@dataclass
class Agent:
id: str
name: str
agent_type: str
capabilities: List[str]
status: AgentStatus = AgentStatus.IDLE
current_task: Optional[Task] = None
class WorkflowEngine:
def __init__(self, max_concurrent_tasks: int = 5):
self.agents: Dict[str, Agent] = {}
self.workflows: Dict[str, 'Workflow'] = {}
self.max_concurrent_tasks = max_concurrent_tasks
def register_agent(self, agent: Agent) -> None:
self.agents[agent.id] = agent
def create_workflow(self, name: str, description: str = "") -> 'Workflow':
workflow = Workflow(name=name, description=description, engine=self)
self.workflows[name] = workflow
return workflow
async def execute_workflow(self, workflow: 'Workflow') -> Dict[str, Any]:
results = {}
for task in workflow.get_execution_order():
await self._wait_for_dependencies(task, results)
result = await self.execute_task(task)
results[task.id] = result
return results
@dataclass
class Workflow:
name: str
description: str
engine: WorkflowEngine
tasks: List[Task] = field(default_factory=list)
status: str = "created"
created_at: datetime = field(default_factory=datetime.now)
def add_stage(self, stage_name: str, agent_type: str,
input_data: Dict[str, Any] = None,
depends_on: List[str] = None) -> Task:
task = Task(
id=f"{stage_name}_{len(self.tasks)}",
name=stage_name,
description=f"Workflow stage: {stage_name}",
agent_type=agent_type,
input_data=input_data or {},
dependencies=depends_on or []
)
self.tasks.append(task)
return task
class Context7Integration:
def __init__(self, mcp_servers: List[str] = None):
self.mcp_servers = mcp_servers or []
self.cache = {}
self.cache_ttl = 3600
async def search_code_examples(self, query: str, language: str = None,
framework: str = None, limit: int = 10) -> List[Dict]:
cache_key = f"code_examples_{query}_{language}_{framework}_{limit}"
# Check cache first
if cache_key in self.cache:
cached = self.cache[cache_key]
if time.time() - cached['timestamp'] < self.cache_ttl:
return cached['data']
# Execute Context7 search
results = await self._context7_search(query, language, framework, limit)
# Cache results
self.cache[cache_key] = {'data': results, 'timestamp': time.time()}
return results
async def get_best_practices(self, topic: str, domain: str = None) -> Dict:
search_query = f"best practices {topic}"
if domain:
search_query += f" {domain}"
results = await self._context7_search(search_query, limit=5)
return results[0] if results else {}
from abc import ABC, abstractmethod
class WorkflowTemplate(ABC):
def __init__(self, name: str, description: str):
self.name = name
self.description = description
@abstractmethod
def create_workflow(self, engine: WorkflowEngine, config: Dict) -> Workflow:
pass
class FeatureDevelopmentTemplate(WorkflowTemplate):
def __init__(self):
super().__init__("feature_development", "End-to-end feature development")
def create_workflow(self, engine: WorkflowEngine, config: Dict) -> Workflow:
workflow = engine.create_workflow(
name=f"feature_{config.get('feature_name', 'unknown')}",
description=config.get('feature_description', '')
)
# Specification stage
spec_task = workflow.add_stage("specification", "spec-builder", {
"feature_description": config.get('feature_description', ''),
"requirements": config.get('requirements', [])
})
# Implementation stage
impl_task = workflow.add_stage("implementation", "tdd-implementer", {
"spec_id": spec_task.id,
"technology_stack": config.get('technology_stack', [])
}, depends_on=[spec_task.id])
# Testing stage
workflow.add_stage("testing", "quality-gate", {
"implementation_id": impl_task.id,
"test_types": config.get('test_types', ['unit', 'integration'])
}, depends_on=[impl_task.id])
return workflow
from enum import Enum
import uuid
import asyncio
class WorkflowPriority(Enum):
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
class WorkflowScheduler:
def __init__(self, workflow_engine: WorkflowEngine):
self.workflow_engine = workflow_engine
self.workflow_queue = asyncio.Queue()
self.template_manager = WorkflowTemplateManager()
async def submit_workflow(self, template_name: str, config: Dict,
priority: WorkflowPriority = WorkflowPriority.MEDIUM,
scheduled_time: Optional[datetime] = None) -> str:
workflow_id = str(uuid.uuid4())
workflow = self.template_manager.create_workflow_from_template(
template_name, self.workflow_engine, config
)
if not workflow:
raise ValueError(f"Unknown template: {template_name}")
workflow_item = {
'workflow_id': workflow_id,
'workflow': workflow,
'priority': priority,
'scheduled_time': scheduled_time or datetime.now()
}
await self.workflow_queue.put((-priority.value, workflow_item))
return workflow_id
class WorkflowErrorHandler:
def __init__(self, workflow_engine: WorkflowEngine):
self.workflow_engine = workflow_engine
async def handle_task_failure(self, task: Task, error: Exception) -> bool:
if task.retry_count < task.max_retries:
return await self._retry_with_backoff(task, error)
if self._is_retriable_error(error):
return await self._retry_with_backoff(task, error)
else:
return await self._require_manual_intervention(task, error)
async def _retry_with_backoff(self, task: Task, error: Exception) -> bool:
delay = 2 ** task.retry_count
task.retry_count += 1
await asyncio.sleep(delay)
try:
await self.workflow_engine.execute_task(task)
return True
except Exception as e:
return await self.handle_task_failure(task, e)
def _is_retriable_error(self, error: Exception) -> bool:
retriable_errors = ['TimeoutError', 'ConnectionError', 'TemporaryFailure']
return type(error).__name__ in retriable_errors
class WorkflowMetrics:
def __init__(self):
self.metrics = {
'workflows_completed': 0,
'workflows_failed': 0,
'total_execution_time': 0.0,
'task_completion_times': []
}
def record_workflow_completion(self, workflow_id: str, execution_time: float, status: str):
if status == 'completed':
self.metrics['workflows_completed'] += 1
else:
self.metrics['workflows_failed'] += 1
self.metrics['total_execution_time'] += execution_time
def get_performance_summary(self) -> Dict[str, Any]:
completed = self.metrics['workflows_completed']
failed = self.metrics['workflows_failed']
total = completed + failed
if total == 0:
return {'error': 'No workflows executed'}
avg_time = self.metrics['total_execution_time'] / total if total > 0 else 0
success_rate = (completed / total) * 100 if total > 0 else 0
return {
'total_workflows': total,
'success_rate': f"{success_rate:.2f}%",
'average_execution_time': f"{avg_time:.2f}s",
'workflows_completed': completed,
'workflows_failed': failed
}
Use for:
Avoid for:
# Feature Development Workflow
workflow_id = await scheduler.submit_workflow(
"feature_development",
{
"feature_name": "user_authentication",
"feature_description": "Implement secure user login",
"requirements": ["JWT auth", "Password hashing"],
"acceptance_criteria": ["Users can login securely"]
}
)
# Bug Fix Workflow
bug_workflow_id = await scheduler.submit_workflow(
"bug_fix",
{
"bug_id": "BUG-001",
"bug_description": "Login fails with invalid credentials",
"error_logs": ["AuthException at line 42"],
"reproduction_steps": ["Enter invalid password"]
},
priority=WorkflowPriority.HIGH
)
TRUST Principles Applied:
Enterprise Security Features:
moai-alfred-agent-guide - Agent selection and delegation patternsmoai-alfred-spec-authoring - SPEC creation workflowsmoai-essentials-debug - Error handling and troubleshootingmoai-foundation-trust - Security and compliance principlesmoai-domain-backend - Backend-specific workflow patternsmoai-domain-testing - Testing workflow integrationEnterprise v4.0 Compliance: Progressive disclosure with comprehensive error handling, security controls, and monitoring.
Last Updated: 2025-11-13
Dependencies: Context7 MCP integration, Alfred agent system
See Also: examples.md for detailed usage examples
content-media
Download YouTube video transcripts when user provides a YouTube URL or asks to download/get/fetch a transcript from YouTube. Also use when user wants to transcribe or get captions/subtitles from a YouTube video.
development
Transform learning content (like YouTube transcripts, articles, tutorials) into actionable implementation plans using the Ship-Learn-Next framework. Use when user wants to turn advice, lessons, or educational content into concrete action steps, reps, or a learning quest.
tools
Toolkit for styling artifacts with a theme. These artifacts can be slides, docs, reportings, HTML landing pages, etc. There are 10 pre-set themes with colors/fonts that you can apply to any artifact that has been creating, or can generate a new theme on-the-fly.
tools
Replace with description of the skill and when Claude should use it.