plugins/fastapi-backend/skills/fastapi-background/SKILL.md
This skill should be used when the user asks to "create background task", "add async job", "implement task queue", "schedule periodic task", "use Celery", "use ARQ", "process async", or mentions background processing, task queues, job scheduling, workers, or async jobs. Provides multiple task queue framework patterns.
npx skillsauth add markus41/claude FastAPI Background TasksInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
This skill provides patterns for background task processing with multiple frameworks: ARQ (recommended for async), Celery, and Dramatiq.
pip install arq
# app/workers/config.py
from arq.connections import RedisSettings
from app.config import get_settings
settings = get_settings()
class WorkerSettings:
redis_settings = RedisSettings(
host=settings.redis_host,
port=settings.redis_port,
password=settings.redis_password,
database=1 # Separate from cache
)
# Job settings
max_jobs = 10
job_timeout = 300 # 5 minutes
keep_result = 3600 # 1 hour
queue_name = "default"
# Cron jobs
cron_jobs = []
# app/workers/tasks.py
from arq import cron
from typing import Dict, Any
import asyncio
async def send_email(ctx: Dict[str, Any], to: str, subject: str, body: str):
"""Send email asynchronously."""
email_service = ctx.get("email_service")
await email_service.send(to=to, subject=subject, body=body)
return {"status": "sent", "to": to}
async def process_upload(ctx: Dict[str, Any], file_id: str, user_id: str):
"""Process uploaded file (resize, convert, etc.)."""
storage = ctx.get("storage")
file_data = await storage.get(file_id)
# Process file
processed = await process_file(file_data)
# Save processed file
await storage.put(f"processed/{file_id}", processed)
return {"status": "processed", "file_id": file_id}
async def cleanup_expired(ctx: Dict[str, Any]):
"""Periodic cleanup of expired data."""
db = ctx.get("db")
result = await db.delete_expired()
return {"deleted": result.deleted_count}
# Cron job example
@cron(hour=2, minute=0) # Run at 2 AM daily
async def daily_report(ctx: Dict[str, Any]):
"""Generate daily report."""
report_service = ctx.get("report_service")
await report_service.generate_daily()
# app/workers/main.py
from arq import create_pool
from arq.connections import RedisSettings
from app.workers.config import WorkerSettings
from app.workers.tasks import send_email, process_upload, cleanup_expired, daily_report
from app.infrastructure.database import init_database
from app.services.email import EmailService
async def startup(ctx: Dict[str, Any]):
"""Worker startup - initialize services."""
await init_database()
ctx["email_service"] = EmailService()
ctx["db"] = get_db()
async def shutdown(ctx: Dict[str, Any]):
"""Worker shutdown - cleanup."""
await close_database()
class WorkerSettings(WorkerSettings):
functions = [send_email, process_upload, cleanup_expired]
cron_jobs = [daily_report]
on_startup = startup
on_shutdown = shutdown
# Run with: arq app.workers.main.WorkerSettings
# app/dependencies.py
from arq import ArqRedis, create_pool
from arq.connections import RedisSettings
async def get_task_queue() -> ArqRedis:
return await create_pool(RedisSettings())
# app/routes/users.py
from fastapi import Depends
from arq import ArqRedis
@router.post("/users/{user_id}/welcome")
async def send_welcome_email(
user_id: str,
queue: ArqRedis = Depends(get_task_queue)
):
user = await get_user(user_id)
# Enqueue background task
job = await queue.enqueue_job(
"send_email",
to=user.email,
subject="Welcome!",
body="Thanks for signing up."
)
return {"job_id": job.job_id, "status": "queued"}
@router.post("/uploads")
async def upload_file(
file: UploadFile,
user: User = Depends(get_current_user),
queue: ArqRedis = Depends(get_task_queue)
):
# Save file
file_id = await save_file(file)
# Enqueue processing
await queue.enqueue_job(
"process_upload",
file_id=file_id,
user_id=str(user.id),
_defer_by=5 # Delay 5 seconds
)
return {"file_id": file_id, "status": "processing"}
# app/workers/celery_app.py
from celery import Celery
from app.config import get_settings
settings = get_settings()
celery_app = Celery(
"worker",
broker=settings.celery_broker_url,
backend=settings.celery_result_backend,
include=["app.workers.celery_tasks"]
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_time_limit=300,
worker_prefetch_multiplier=1,
)
# Periodic tasks (Celery Beat)
celery_app.conf.beat_schedule = {
"cleanup-every-hour": {
"task": "app.workers.celery_tasks.cleanup_expired",
"schedule": 3600.0,
},
"daily-report": {
"task": "app.workers.celery_tasks.generate_daily_report",
"schedule": crontab(hour=2, minute=0),
},
}
# app/workers/celery_tasks.py
from app.workers.celery_app import celery_app
import asyncio
def run_async(coro):
"""Helper to run async code in sync Celery tasks."""
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro)
@celery_app.task(bind=True, max_retries=3)
def send_email(self, to: str, subject: str, body: str):
try:
run_async(_send_email_async(to, subject, body))
return {"status": "sent", "to": to}
except Exception as exc:
self.retry(exc=exc, countdown=60)
@celery_app.task
def process_upload(file_id: str, user_id: str):
run_async(_process_upload_async(file_id, user_id))
return {"status": "processed", "file_id": file_id}
# app/workers/dramatiq_app.py
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results import Results
from dramatiq.results.backends import RedisBackend
redis_broker = RedisBroker(url="redis://localhost:6379/0")
result_backend = RedisBackend(url="redis://localhost:6379/1")
redis_broker.add_middleware(Results(backend=result_backend))
dramatiq.set_broker(redis_broker)
# app/workers/dramatiq_tasks.py
import dramatiq
@dramatiq.actor(max_retries=3, min_backoff=1000)
def send_email(to: str, subject: str, body: str):
# Sync implementation
return {"status": "sent", "to": to}
@dramatiq.actor(time_limit=300000) # 5 min timeout
def process_upload(file_id: str, user_id: str):
return {"status": "processed", "file_id": file_id}
For simple fire-and-forget tasks (no persistence):
from fastapi import BackgroundTasks
async def write_log(message: str):
with open("log.txt", "a") as f:
f.write(f"{message}\n")
@router.post("/log")
async def create_log(message: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, message)
return {"status": "logged"}
For detailed patterns:
references/arq-advanced.md - ARQ advanced patterns, retries, prioritiesreferences/celery-patterns.md - Celery best practices, chains, groupsreferences/monitoring.md - Flower, task monitoringWorking examples in examples/:
examples/arq_worker.py - Complete ARQ workerexamples/celery_app.py - Celery configurationexamples/task_service.py - Task enqueueing servicedevelopment
Enhanced plan-authoring skill with Pre-Writing context gathering, task metadata, non-TDD templates, Red Flags, telemetry, and an automated plan linter. Use when you have a spec or requirements for a multi-step task, before touching code.
tools
Documentation intelligence engine with graph-based API docs, algorithm library, and drift detection
tools
Ultraplan cloud planning — kick off a plan in the cloud from your terminal, review and revise in the browser, then execute remotely or send back to CLI
tools
--- name: mcp description: Configure MCP servers for Claude Code — stdio vs HTTP, authentication, Tools/Resources/Prompts distinction, channels (CI webhook, mobile relay, Discord bridge, fakechat), and cost of always-loaded tools. Use this skill whenever adding an MCP server, debugging connection issues, choosing between MCP Tools vs Prompts vs Resources, installing channel servers, or managing .mcp.json. Triggers on: "MCP server", "mcp config", "add Obsidian MCP", "install context7", "channels"