skills/fastapi-patterns/SKILL.md
FastAPI best practices covering project structure, Pydantic v2 schemas, dependency injection, async handlers, authentication, authorization, transactional service layers, and testing with httpx and pytest.
npx skillsauth add affaan-m/everything-claude-code fastapi-patternsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Modern, production-grade FastAPI development: project layout, Pydantic v2 schemas, dependency injection, async patterns, auth, transactional service methods, and testing.
my_app/
|-- app/
| |-- main.py # App factory, lifespan, middleware
| |-- config.py # Settings via pydantic-settings
| |-- dependencies.py # Shared FastAPI dependencies
| |-- database.py # SQLAlchemy engine + session
| |-- routers/
| | `-- users.py
| |-- models/ # SQLAlchemy ORM models
| | `-- user.py
| |-- schemas/ # Pydantic request/response schemas
| | `-- user.py
| `-- services/ # Business logic layer
| `-- user_service.py
|-- tests/
| |-- conftest.py
| `-- test_users.py
|-- pyproject.toml
`-- .env
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.config import settings
from app.database import engine, Base
from app.routers import users
@asynccontextmanager
async def lifespan(app: FastAPI):
# Automatically create tables on startup for ease of use in dev/demo environments.
# For strict production applications, manage schemas via Alembic migrations instead.
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
# Shutdown: close pooled resources.
await engine.dispose()
def create_app() -> FastAPI:
app = FastAPI(
title=settings.app_name,
version=settings.app_version,
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.allowed_origins,
allow_credentials=settings.allow_credentials,
allow_methods=settings.allowed_methods,
allow_headers=settings.allowed_headers,
)
app.include_router(users.router, prefix="/users", tags=["users"])
return app
app = create_app()
# app/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
app_name: str = "My App"
app_version: str = "0.1.0"
debug: bool = False
database_url: str
secret_key: str
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
# Pydantic-settings v2 safely evaluates mutable list literals directly
allowed_origins: list[str] = ["http://localhost:3000"]
allowed_methods: list[str] = ["GET", "POST", "PATCH", "DELETE", "OPTIONS"]
allowed_headers: list[str] = ["Authorization", "Content-Type"]
allow_credentials: bool = True
settings = Settings()
# app/schemas/user.py
from datetime import datetime
from pydantic import BaseModel, EmailStr, Field, model_validator
class UserBase(BaseModel):
email: EmailStr
username: str = Field(min_length=3, max_length=50)
class UserCreate(UserBase):
password: str = Field(min_length=8)
password_confirm: str
@model_validator(mode="after")
def passwords_match(self) -> "UserCreate":
if self.password != self.password_confirm:
raise ValueError("Passwords do not match")
return self
class UserUpdate(BaseModel):
username: str | None = Field(default=None, min_length=3, max_length=50)
email: EmailStr | None = None
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
model_config = {"from_attributes": True}
class UserListResponse(BaseModel):
total: int
items: list[UserResponse]
# app/dependencies.py
from typing import Annotated, AsyncGenerator
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import AsyncSessionLocal
from app.models.user import User
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/users/token")
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
try:
yield session
except Exception:
await session.rollback()
raise
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
subject = payload.get("sub")
if subject is None:
raise credentials_exception
user_id = int(subject)
except (JWTError, TypeError, ValueError):
raise credentials_exception
user = await db.get(User, user_id)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
if not current_user.is_active:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user")
return current_user
DbDep = Annotated[AsyncSession, Depends(get_db)]
CurrentUserDep = Annotated[User, Depends(get_current_user)]
ActiveUserDep = Annotated[User, Depends(get_current_active_user)]
# app/routers/users.py
from typing import Annotated
from fastapi import APIRouter, HTTPException, Query, status
from fastapi.security import OAuth2PasswordRequestForm
from app.dependencies import ActiveUserDep, DbDep
from app.schemas.user import UserCreate, UserResponse, UserUpdate, UserListResponse
from app.services.user_service import DuplicateUserError, UserService
router = APIRouter()
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(payload: UserCreate, db: DbDep) -> UserResponse:
service = UserService(db)
try:
return await service.create(payload)
except DuplicateUserError:
raise HTTPException(status_code=400, detail="Email already registered")
@router.get("/me", response_model=UserResponse)
async def get_me(current_user: ActiveUserDep) -> UserResponse:
return current_user
@router.get("/", response_model=UserListResponse)
async def list_users(
db: DbDep,
current_user: ActiveUserDep,
skip: Annotated[int, Query(ge=0)] = 0,
limit: Annotated[int, Query(ge=1, le=100)] = 20,
) -> UserListResponse:
service = UserService(db)
users, total = await service.list(skip=skip, limit=limit)
return UserListResponse(total=total, items=users)
@router.patch("/{user_id}", response_model=UserResponse)
async def update_user(
user_id: int,
payload: UserUpdate,
db: DbDep,
current_user: ActiveUserDep,
) -> UserResponse:
if current_user.id != user_id:
raise HTTPException(status_code=403, detail="Not authorized")
service = UserService(db)
try:
user = await service.update(user_id, payload)
except DuplicateUserError:
raise HTTPException(status_code=400, detail="Email already registered")
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.post("/token")
async def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: DbDep,
) -> dict[str, str]:
service = UserService(db)
token = await service.authenticate(form_data.username, form_data.password)
if token is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
return {"access_token": token, "token_type": "bearer"}
# app/services/user_service.py
from datetime import datetime, timedelta, timezone
from jose import jwt
from passlib.context import CryptContext
from sqlalchemy import func, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class DuplicateUserError(Exception):
"""Raised when a unique user field conflicts with an existing row."""
class UserService:
def __init__(self, db: AsyncSession) -> None:
self.db = db
async def get_by_email(self, email: str) -> User | None:
result = await self.db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
async def create(self, payload: UserCreate) -> User:
user = User(
email=payload.email,
username=payload.username,
hashed_password=pwd_context.hash(payload.password),
)
self.db.add(user)
try:
# Rely on atomic DB constraints rather than race-prone application-level prechecks
await self.db.commit()
except IntegrityError as exc:
await self.db.rollback()
raise DuplicateUserError from exc
await self.db.refresh(user)
return user
async def list(self, skip: int = 0, limit: int = 20) -> tuple[list[User], int]:
total_result = await self.db.execute(select(func.count(User.id)))
total = total_result.scalar_one()
# Enforce explicit deterministic ordering to ensure reliable pagination
result = await self.db.execute(
select(User).order_by(User.id).offset(skip).limit(limit)
)
return list(result.scalars()), total
async def update(self, user_id: int, payload: UserUpdate) -> User | None:
user = await self.db.get(User, user_id)
if user is None:
return None
for field, value in payload.model_dump(exclude_unset=True).items():
setattr(user, field, value)
try:
await self.db.commit()
except IntegrityError as exc:
await self.db.rollback()
raise DuplicateUserError from exc
await self.db.refresh(user)
return user
async def authenticate(self, email: str, password: str) -> str | None:
user = await self.get_by_email(email)
if user is None or not pwd_context.verify(password, user.hashed_password):
return None
expire = datetime.now(timezone.utc) + timedelta(
minutes=settings.access_token_expire_minutes
)
return jwt.encode(
{"sub": str(user.id), "exp": expire},
settings.secret_key,
algorithm=settings.algorithm,
)
Note on Database Design: Application-level unique handling requires an underlying unique database index (e.g.,
unique=Trueon your SQLAlchemy mapping attributes). Without underlying constraints, application layer error-catching cannot safely prevent concurrent race conditions.
# tests/conftest.py
import pytest_asyncio
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from app.database import Base
from app.dependencies import get_db
from app.main import create_app
TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"
engine = create_async_engine(TEST_DATABASE_URL)
TestingSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
@pytest_asyncio.fixture(autouse=True)
async def setup_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
@pytest_asyncio.fixture
async def db_session():
async with TestingSessionLocal() as session:
yield session
await session.rollback()
@pytest_asyncio.fixture
async def client(db_session: AsyncSession):
app = create_app()
async def override_get_db():
yield db_session
app.dependency_overrides[get_db] = override_get_db
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as ac:
yield ac
@pytest_asyncio.fixture
async def registered_user(client: AsyncClient) -> dict:
resp = await client.post("/users/", json={
"email": "[email protected]",
"username": "testuser",
"password": "securepass1",
"password_confirm": "securepass1",
})
assert resp.status_code == 201
return resp.json()
@pytest_asyncio.fixture
async def auth_token(client: AsyncClient, registered_user: dict) -> str:
resp = await client.post("/users/token", data={
"username": "[email protected]",
"password": "securepass1",
})
assert resp.status_code == 200
return resp.json()["access_token"]
@pytest_asyncio.fixture
async def auth_client(client: AsyncClient, auth_token: str) -> AsyncClient:
client.headers.update({"Authorization": f"Bearer {auth_token}"})
return client
# Bad: business logic inside route handlers.
@router.post("/users/")
async def create_user(payload: UserCreate, db: DbDep):
hashed = bcrypt.hash(payload.password)
user = User(email=payload.email, hashed_password=hashed)
db.add(user)
await db.commit()
return user
# Good: thin route, transactional service handling.
@router.post("/users/", response_model=UserResponse, status_code=201)
async def create_user(payload: UserCreate, db: DbDep):
try:
return await UserService(db).create(payload)
except DuplicateUserError:
raise HTTPException(status_code=400, detail="Email already registered")
# Bad: sync DB calls in async routes block the event loop.
@router.get("/items/")
async def list_items(db: Session = Depends(get_db)):
return db.query(Item).all()
# Good: use async SQLAlchemy executions.
@router.get("/items/")
async def list_items(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Item))
return result.scalars().all()
response_model to prevent accidental PII/data leaks and output clean OpenAPI schemas.DbDep = Annotated[AsyncSession, Depends(get_db)]..order_by(Model.id)) on all offset/limit paginated endpoints to avoid data skips.401 vs 403).tools
Garbage collection for your Claude Code configuration. Periodically scans ~/.claude (skills, memory, hooks, permissions, MCP servers, caches) for redundant, stale, orphaned, or low-value items, then walks the user through a confirm-each-deletion cleanup. Use when the user says "clean up my config", "config GC", "too many skills", "audit my setup", "my .claude is bloated", or asks for a periodic config review.
data-ai
当用户希望通过并行工作、并发 agents、批量工具调用、隔离 worktree 或多条独立验证通道来大幅加速任务、同时不损失正确性时使用。
documentation
在回答之前先读取仓库的实时状态,引导用户了解 ECC 当前的 agents、skills、命令、hooks、规则、安装配置档案以及项目接入流程。
testing
Fact-forcing gate that blocks Edit/Write/Bash (including MultiEdit) and demands concrete investigation (importers, data schemas, user instruction) before allowing the action. Measurably improves output quality by +2.25 points vs ungated agents.