frontier-python-ts/skills/arq/SKILL.md
Use when implementing background jobs, task queues, scheduled jobs, retries, or any Redis-backed async worker in a Python backend service in this harness. arq is the chosen queue (async-native, Redis-backed). Replaces the old bullmq skill.
npx skillsauth add jon23d/skillz arqInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
arq is a small, async-native job queue backed by Redis. It is the only background job system in this harness.
| Need | Use |
|---|---|
| Trivial fire-and-forget that completes inside the request lifecycle | FastAPI BackgroundTasks |
| Anything that must survive a process restart | arq |
| Anything that must retry on failure | arq |
| Anything that must run on a schedule (cron) | arq |
| Anything that hits a slow third-party API | arq |
| Anything where the user does not need to wait for the result | arq |
If in doubt, use arq.
uv add arq
app/
workers/
__init__.py
settings.py # WorkerSettings — defines functions, schedule, redis URL
jobs/
email.py
reports.py
A job is an async def function that takes ctx as its first argument. ctx is a dict; arq populates it with the redis pool and the job metadata.
# app/workers/jobs/email.py
from typing import Any
import structlog
log = structlog.get_logger()
async def send_welcome_email(ctx: dict[str, Any], user_id: str, email: str) -> None:
log.info("send_welcome_email.start", user_id=user_id, job_id=ctx["job_id"])
# Call your email service here
log.info("send_welcome_email.done", user_id=user_id)
Rules:
ctx first, then arguments. Arguments are pickled and stored in Redis — keep them small and primitive (IDs, strings, numbers — never ORM objects).dedupe_key (job ID, request ID) the job checks before doing real work.job_timeout per worker (see settings below). A job that hangs forever blocks a worker slot.# app/workers/settings.py
from arq.connections import RedisSettings
from arq.cron import cron
from app.core.config import get_settings
from app.core.db import async_session_maker
from app.core.logging import configure_logging
from app.workers.jobs.email import send_welcome_email
from app.workers.jobs.reports import generate_daily_report
async def startup(ctx: dict) -> None:
configure_logging()
ctx["session_maker"] = async_session_maker
async def shutdown(ctx: dict) -> None:
pass
class WorkerSettings:
functions = [send_welcome_email]
cron_jobs = [
cron(generate_daily_report, hour=2, minute=0), # 02:00 UTC daily
]
on_startup = startup
on_shutdown = shutdown
redis_settings = RedisSettings.from_dsn(get_settings().redis_url.unicode_string())
max_jobs = 20
job_timeout = 300 # seconds — kill any job that runs longer
keep_result = 3600 # seconds — how long results stay in Redis
max_tries = 3 # retries before a job is dead-lettered
Run the worker with: uv run arq app.workers.settings.WorkerSettings.
The Redis pool is shared with the API. Create it in the FastAPI lifespan and store it on app.state.
# app/main.py
from arq import create_pool
from arq.connections import RedisSettings
@asynccontextmanager
async def lifespan(app: FastAPI):
settings = get_settings()
app.state.redis = await create_pool(
RedisSettings.from_dsn(settings.redis_url.unicode_string())
)
yield
await app.state.redis.close()
# app/api/v1/users.py
from fastapi import Request
@router.post("/users", response_model=UserRead, status_code=201)
async def create_user(
payload: UserCreate,
request: Request,
db=Depends(get_db),
) -> UserRead:
user = await user_service.create(db, payload)
await request.app.state.redis.enqueue_job(
"send_welcome_email",
user.id,
user.email,
)
return user
Rules:
db.flush() so the row exists, but be aware the request transaction may still roll back. For at-least-once delivery semantics: the job should SELECT the row and exit cleanly if it does not exist.async def enqueue_send_welcome(redis, *, user_id: str, email: str) -> None:
await redis.enqueue_job("send_welcome_email", user_id, email)
Jobs do not use the FastAPI get_db dependency. They make their own session from the session_maker placed in ctx by startup.
async def generate_daily_report(ctx: dict[str, Any]) -> None:
session_maker = ctx["session_maker"]
async with session_maker() as db:
async with db.begin():
# do work
...
# `async with db.begin()` commits on exit
Each job gets its own session and its own transaction. Never reuse sessions across jobs.
max_tries automatically on any unhandled exception.Retry(defer=...) to schedule a retry with a custom delay (e.g. exponential backoff for rate-limited APIs):
from arq.worker import Retry
async def call_third_party(ctx, payload):
try:
await client.post(...)
except RateLimitError as exc:
raise Retry(defer=exc.retry_after_seconds)
max_tries, the job is logged and dropped. There is no built-in dead-letter queue — if you need durable failure inspection, write the job and its error to a database table from the failing job's except block before re-raising.cron(...) entries in WorkerSettings.cron_jobs run on the same worker. For non-trivial schedules, prefer one explicit cron line per job over clever conditionals — they are easier to read and harder to break.
cron_jobs = [
cron(generate_daily_report, hour=2, minute=0),
cron(reconcile_billing, hour=3, minute=0),
cron(cleanup_expired_sessions, minute={0, 30}), # every 30 min
]
Test the function directly with pytest-asyncio — do not spin up arq.
async def test_send_welcome_email(monkeypatch):
sent: list[str] = []
monkeypatch.setattr("app.workers.jobs.email.email_client.send",
lambda **kw: sent.append(kw["to"]))
await send_welcome_email({"job_id": "test"}, user_id="u1", email="[email protected]")
assert sent == ["[email protected]"]
For an end-to-end test that exercises the queue itself, use arq.worker.Worker.run_check against an in-memory Redis (fakeredis.aioredis).
commit() inside a job using async with session_maker() as db: — let async with db.begin(): manage the transaction.job_timeout — one stuck job blocks a worker slot indefinitely.redis-py sync) — wrong. arq uses the async client; do not mix.app.services/app.core, the app does not import from app.workers.app.state.redis for non-arq Redis operations — fine technically, but document it; arq's pool is sized for queue ops.development
Use when adding or modifying environment variable handling in TypeScript projects or monorepos — especially when using process.env directly, missing startup validation, sharing env schemas across packages, or encountering "undefined is not a string" errors at runtime from missing env vars.
testing
Use when creating a new skill, editing an existing skill, writing a SKILL.md, or verifying a skill works before deployment.
development
React UI design principles and conventions. Load when building or modifying any user interface or React components. Covers application type detection, visual standards, component design and structure, Mantine (business apps) and Tailwind (consumer apps), accessibility, responsiveness, state management, data fetching, testing, and in-app help patterns.
development
Use when setting up ESLint and/or Prettier in a TypeScript project, adding linting to an existing TypeScript codebase, or configuring typescript-eslint, eslint-config-prettier, or related packages.