skills/py-async-patterns/SKILL.md
Async/await patterns for FastAPI and SQLAlchemy. Use when working with async code, database sessions, concurrent operations, or debugging async issues in Python.
npx skillsauth add cjharmath/claude-agents-skills py-async-patternsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Async Python is powerful but error-prone. Race conditions, session leaks, and connection pool issues are common pitfalls in async codebases.
Problem: Session must be scoped to request. Leaking sessions causes stale data and connection exhaustion.
# ✅ CORRECT: Session scoped to request via dependency
async def get_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session() as session:
yield session
# Session automatically closed after request
# Usage in endpoint
@router.get("/users/{user_id}")
async def get_user(
user_id: UUID,
session: AsyncSession = Depends(get_session),
) -> UserRead:
result = await session.execute(select(User).where(User.id == user_id))
return result.scalar_one()
# ❌ WRONG: Global session (stale data, connection leaks)
_global_session = None # NEVER do this
async def get_user(user_id: UUID):
result = await _global_session.execute(...) # Stale, shared state
Why it matters: Each request needs isolated database state. Shared sessions see stale data and can't be safely committed.
Problem: Running independent queries sequentially wastes time. But dependent queries must be sequential.
# ✅ CORRECT: Concurrent independent queries
async def get_dashboard_data(user_id: UUID, session: AsyncSession):
# These don't depend on each other - run in parallel
user_result, stats_result, recent_result = await asyncio.gather(
session.execute(select(User).where(User.id == user_id)),
session.execute(select(UserStats).where(UserStats.user_id == user_id)),
session.execute(
select(Activity)
.where(Activity.user_id == user_id)
.order_by(Activity.created_at.desc())
.limit(10)
),
)
return {
"user": user_result.scalar_one(),
"stats": stats_result.scalar_one_or_none(),
"recent": recent_result.scalars().all(),
}
# ❌ WRONG: Sequential when parallel is safe
async def get_dashboard_data_slow(user_id: UUID, session: AsyncSession):
user = await session.execute(...) # Wait...
stats = await session.execute(...) # Wait more...
recent = await session.execute(...) # Even more waiting
# Total time = sum of all queries
# ✅ CORRECT: Sequential when queries depend on each other
async def get_user_with_team(user_id: UUID, session: AsyncSession):
# Must get user first to know team_id
user_result = await session.execute(
select(User).where(User.id == user_id)
)
user = user_result.scalar_one()
# Now we can query team
team_result = await session.execute(
select(Team).where(Team.id == user.team_id)
)
return user, team_result.scalar_one()
Decision framework:
| Queries share data? | Use |
|---------------------|-----|
| No (independent) | asyncio.gather() |
| Yes (dependent) | Sequential await |
Problem: Knowing when to commit, rollback, and refresh.
# ✅ CORRECT: Explicit transaction for multi-step operations
async def transfer_player(
player_id: UUID,
from_team_id: UUID,
to_team_id: UUID,
session: AsyncSession,
):
try:
# All operations in one transaction
player = await session.get(Player, player_id)
player.team_id = to_team_id
from_team = await session.get(Team, from_team_id)
from_team.player_count -= 1
to_team = await session.get(Team, to_team_id)
to_team.player_count += 1
await session.commit()
except Exception:
await session.rollback()
raise
# ✅ CORRECT: Using context manager
async with session.begin():
# All operations here are in a transaction
# Auto-commits on success, auto-rollbacks on exception
player.team_id = to_team_id
from_team.player_count -= 1
to_team.player_count += 1
# ✅ CORRECT: Refresh after commit to get DB-generated values
await session.commit()
await session.refresh(new_entity) # Get id, created_at, etc.
return new_entity
When to use what:
| Scenario | Pattern |
|----------|---------|
| Single create/update | session.add() + commit() at request end |
| Multi-step operation | Explicit begin() / commit() / rollback() |
| Need DB-generated values | refresh() after commit |
| Read-only query | No commit needed |
Problem: Exhausting connection pool causes requests to hang.
# This codebase uses NullPool for async - understand why
engine = create_async_engine(
DATABASE_URL,
poolclass=NullPool, # No connection pooling
)
# NullPool: Each request gets new connection, closes after
# Why: Avoids issues with asyncpg + connection reuse
# Tradeoff: Slightly more connection overhead
# ✅ CORRECT: Always close sessions (handled by Depends)
async with async_session() as session:
# Work with session
pass # Session closed here
# ❌ WRONG: Forgetting to close
session = async_session()
result = await session.execute(query)
# Session never closed - connection leak!
Problem: Long-running work shouldn't block the response.
from fastapi import BackgroundTasks
# ✅ CORRECT: FastAPI BackgroundTasks for request-scoped work
@router.post("/assessments/{id}/submit")
async def submit_assessment(
id: UUID,
session: AsyncSession = Depends(get_session),
background_tasks: BackgroundTasks,
) -> AssessmentResult:
# Quick work - return response
result = await process_submission(id, session)
# Slow work - do after response
background_tasks.add_task(send_completion_email, result.user_email)
background_tasks.add_task(update_analytics, result)
return result
# ✅ CORRECT: asyncio.create_task for fire-and-forget
async def process_with_side_effect():
result = await main_operation()
# Fire and forget - don't await
asyncio.create_task(log_to_external_service(result))
return result
# ❌ WRONG: Awaiting non-critical slow operations
async def slow_endpoint():
result = await main_operation()
await send_email(result) # User waits for email...
await update_analytics(result) # User still waiting...
return result
When to use what:
| Scenario | Pattern |
|----------|---------|
| Post-response cleanup | BackgroundTasks |
| Fire-and-forget logging | asyncio.create_task() |
| Must complete before response | Direct await |
Problem: Concurrent operations acquiring locks in different order.
# ❌ WRONG: Potential deadlock
async def transfer_both_ways():
# Task 1: Lock A, then B
# Task 2: Lock B, then A
# = Deadlock if interleaved
pass
# ✅ CORRECT: Consistent lock ordering
async def transfer_credits(
from_id: UUID,
to_id: UUID,
amount: int,
session: AsyncSession,
):
# Always lock in consistent order (e.g., by UUID)
first_id, second_id = sorted([from_id, to_id])
# Lock in consistent order
first = await session.get(Account, first_id, with_for_update=True)
second = await session.get(Account, second_id, with_for_update=True)
# Now safe to modify
if from_id == first_id:
first.balance -= amount
second.balance += amount
else:
second.balance -= amount
first.balance += amount
await session.commit()
Same principle as frontend - verify async operations succeeded:
# ✅ CORRECT: Validate after async operations
async def create_assessment(data: AssessmentCreate, session: AsyncSession):
assessment = Assessment(**data.model_dump())
session.add(assessment)
await session.commit()
await session.refresh(assessment)
# Validate post-condition
if assessment.id is None:
raise RuntimeError("Assessment creation failed - no ID assigned")
return assessment
# ✅ CORRECT: Validate data was actually loaded
async def get_user_or_fail(user_id: UUID, session: AsyncSession) -> User:
result = await session.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(404, f"User {user_id} not found")
return user
import structlog
logger = structlog.get_logger()
async def complex_operation(user_id: UUID, session: AsyncSession):
logger.info("complex_operation.start", user_id=str(user_id))
try:
result = await step_one(session)
logger.debug("complex_operation.step_one_complete", result_count=len(result))
await step_two(result, session)
logger.debug("complex_operation.step_two_complete")
await session.commit()
logger.info("complex_operation.success", user_id=str(user_id))
except Exception as e:
logger.error("complex_operation.failed",
user_id=str(user_id),
error=str(e),
step="unknown"
)
raise
| Issue | Likely Cause | Solution |
|-------|--------------|----------|
| "Session is closed" | Using session after request ends | Keep session in request scope |
| Connection timeout | Pool exhausted | Check for session leaks |
| Stale data | Shared session or missing refresh | Scope session to request, refresh after commit |
| Deadlock | Inconsistent lock ordering | Always acquire locks in same order |
| Slow endpoint | Sequential queries that could be parallel | Use asyncio.gather() |
# Find potential session leaks (global sessions)
grep -rn "async_session()" --include="*.py" | grep -v "async with\|Depends"
# Find sequential queries that might be parallelizable
grep -rn "await session.execute" --include="*.py" -A2 | grep -B1 "await session.execute"
# Find missing awaits
ruff check --select=RUF006 # asyncio dangling task
development
Styling patterns for React web applications. Use when working with Tailwind CSS, CSS Modules, theming, responsive design, or component styling.
development
Navigation and routing patterns for React web applications. Use when implementing React Router, Next.js routing, deep links, or handling navigation state.
development
Building and deploying React web applications. Use when configuring builds, deploying to Vercel/Netlify, setting up CI/CD, Docker, or managing environments.
development
Authentication patterns for React web applications. Use when implementing login flows, OAuth, JWT handling, session management, or protected routes in React web apps.