templates/skills/background-jobs/SKILL.md
# Background Jobs Patterns Queue-based job processing, scheduled tasks, and worker patterns. > **Template Usage:** Customize for your queue provider (BullMQ, Inngest, Trigger.dev, AWS SQS) and runtime. ## Job Types | Type | Use Case | Example | |------|----------|---------| | **Async Tasks** | Deferred processing | Send email, process image | | **Scheduled** | Recurring tasks | Daily reports, cleanup | | **Delayed** | Future execution | Reminder after 24h | | **Batch** | Process many items |
npx skillsauth add javeedishaq/ai-workflow-orchestrator templates/skills/background-jobsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Queue-based job processing, scheduled tasks, and worker patterns.
Template Usage: Customize for your queue provider (BullMQ, Inngest, Trigger.dev, AWS SQS) and runtime.
| Type | Use Case | Example | |------|----------|---------| | Async Tasks | Deferred processing | Send email, process image | | Scheduled | Recurring tasks | Daily reports, cleanup | | Delayed | Future execution | Reminder after 24h | | Batch | Process many items | Import CSV, bulk update | | Workflow | Multi-step processes | Onboarding, order fulfillment |
// lib/queue/connection.ts
import { Queue, Worker, QueueEvents } from 'bullmq';
import IORedis from 'ioredis';
export const connection = new IORedis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
maxRetriesPerRequest: null, // Required for BullMQ
});
// Reusable queue factory
export function createQueue(name: string) {
return new Queue(name, {
connection,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: {
count: 1000, // Keep last 1000 completed
age: 24 * 3600, // Or 24 hours
},
removeOnFail: {
count: 5000, // Keep last 5000 failed
},
},
});
}
// lib/queue/queues.ts
import { createQueue } from './connection';
// Define typed queues
export const emailQueue = createQueue('email');
export const imageQueue = createQueue('image-processing');
export const notificationQueue = createQueue('notifications');
export const reportQueue = createQueue('reports');
// Job type definitions
export interface EmailJob {
to: string;
subject: string;
template: string;
data: Record<string, unknown>;
}
export interface ImageProcessingJob {
imageId: string;
operations: Array<'resize' | 'compress' | 'watermark'>;
outputPath: string;
}
export interface ReportJob {
type: 'daily' | 'weekly' | 'monthly';
accountId: string;
startDate: string;
endDate: string;
}
// lib/queue/producers.ts
import { emailQueue, imageQueue, reportQueue } from './queues';
import type { EmailJob, ImageProcessingJob, ReportJob } from './queues';
export const jobs = {
// Immediate job
async sendEmail(data: EmailJob) {
return emailQueue.add('send', data, {
priority: data.template === 'password-reset' ? 1 : 10,
});
},
// Delayed job
async sendReminderEmail(data: EmailJob, delayMs: number) {
return emailQueue.add('send', data, {
delay: delayMs,
});
},
// Scheduled/recurring job
async scheduleReport(data: ReportJob, cron: string) {
return reportQueue.add('generate', data, {
repeat: {
pattern: cron, // e.g., '0 9 * * *' for 9 AM daily
},
});
},
// Bulk jobs
async processImages(images: ImageProcessingJob[]) {
return imageQueue.addBulk(
images.map((data) => ({
name: 'process',
data,
}))
);
},
// Job with custom options
async processImage(data: ImageProcessingJob) {
return imageQueue.add('process', data, {
attempts: 5,
backoff: {
type: 'fixed',
delay: 5000,
},
timeout: 60000, // 1 minute timeout
});
},
};
// workers/email.worker.ts
import { Worker, Job } from 'bullmq';
import { connection } from '@/lib/queue/connection';
import { EmailJob } from '@/lib/queue/queues';
import { sendEmail } from '@/lib/email';
import { createLogger } from '@/lib/logger';
const logger = createLogger('email-worker');
export const emailWorker = new Worker<EmailJob>(
'email',
async (job: Job<EmailJob>) => {
logger.info('Processing email job', {
jobId: job.id,
to: job.data.to,
template: job.data.template,
});
try {
await sendEmail(job.data);
logger.info('Email sent successfully', {
jobId: job.id,
to: job.data.to,
});
return { sent: true, timestamp: new Date().toISOString() };
} catch (error) {
logger.error('Email sending failed', {
jobId: job.id,
error: error instanceof Error ? error.message : 'Unknown',
attempt: job.attemptsMade,
});
throw error;
}
},
{
connection,
concurrency: 5, // Process 5 jobs concurrently
limiter: {
max: 100, // Max 100 jobs
duration: 60000, // Per minute (rate limiting)
},
}
);
// Event handlers
emailWorker.on('completed', (job) => {
logger.info('Job completed', { jobId: job.id });
});
emailWorker.on('failed', (job, err) => {
logger.error('Job failed', {
jobId: job?.id,
error: err.message,
attempts: job?.attemptsMade,
});
});
emailWorker.on('error', (err) => {
logger.error('Worker error', { error: err.message });
});
// workers/image.worker.ts
import { Worker, Job } from 'bullmq';
import { ImageProcessingJob } from '@/lib/queue/queues';
export const imageWorker = new Worker<ImageProcessingJob>(
'image-processing',
async (job: Job<ImageProcessingJob>) => {
const { imageId, operations, outputPath } = job.data;
const totalSteps = operations.length;
for (let i = 0; i < operations.length; i++) {
const operation = operations[i];
// Update progress
await job.updateProgress({
step: i + 1,
totalSteps,
currentOperation: operation,
percent: Math.round(((i + 1) / totalSteps) * 100),
});
// Process operation
switch (operation) {
case 'resize':
await resizeImage(imageId);
break;
case 'compress':
await compressImage(imageId);
break;
case 'watermark':
await addWatermark(imageId);
break;
}
// Log for each step
await job.log(`Completed ${operation} (${i + 1}/${totalSteps})`);
}
return { outputPath, processedAt: new Date().toISOString() };
},
{
connection,
concurrency: 2, // Image processing is CPU intensive
}
);
// lib/inngest/client.ts
import { Inngest } from 'inngest';
export const inngest = new Inngest({
id: 'my-app',
eventKey: process.env.INNGEST_EVENT_KEY,
});
// Event types
export type Events = {
'user/created': { data: { userId: string; email: string } };
'order/placed': { data: { orderId: string; total: number } };
'report/scheduled': { data: { accountId: string; type: string } };
};
// lib/inngest/functions.ts
import { inngest } from './client';
// Simple function
export const sendWelcomeEmail = inngest.createFunction(
{ id: 'send-welcome-email' },
{ event: 'user/created' },
async ({ event, step }) => {
await step.run('send-email', async () => {
await emailService.send({
to: event.data.email,
template: 'welcome',
});
});
}
);
// Multi-step workflow
export const onboardingWorkflow = inngest.createFunction(
{ id: 'onboarding-workflow' },
{ event: 'user/created' },
async ({ event, step }) => {
// Step 1: Send welcome email
await step.run('send-welcome', async () => {
await emailService.send({
to: event.data.email,
template: 'welcome',
});
});
// Step 2: Wait 24 hours
await step.sleep('wait-24h', '24 hours');
// Step 3: Send tips email
await step.run('send-tips', async () => {
await emailService.send({
to: event.data.email,
template: 'getting-started-tips',
});
});
// Step 4: Wait another 3 days
await step.sleep('wait-3-days', '3 days');
// Step 5: Check if user is active
const isActive = await step.run('check-activity', async () => {
const user = await db.user.findUnique({
where: { id: event.data.userId },
select: { lastActiveAt: true },
});
return user?.lastActiveAt && user.lastActiveAt > new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
});
// Step 6: Send re-engagement if inactive
if (!isActive) {
await step.run('send-reengagement', async () => {
await emailService.send({
to: event.data.email,
template: 're-engagement',
});
});
}
}
);
// Scheduled function
export const dailyReport = inngest.createFunction(
{ id: 'daily-report' },
{ cron: '0 9 * * *' }, // 9 AM daily
async ({ step }) => {
const accounts = await step.run('get-accounts', async () => {
return db.account.findMany({ where: { isActive: true } });
});
// Fan out to process each account
await Promise.all(
accounts.map((account) =>
step.run(`generate-report-${account.id}`, async () => {
await reportService.generateDaily(account.id);
})
)
);
}
);
// app/api/inngest/route.ts
import { serve } from 'inngest/next';
import { inngest } from '@/lib/inngest/client';
import {
sendWelcomeEmail,
onboardingWorkflow,
dailyReport,
} from '@/lib/inngest/functions';
export const { GET, POST, PUT } = serve({
client: inngest,
functions: [sendWelcomeEmail, onboardingWorkflow, dailyReport],
});
// In your application code
import { inngest } from '@/lib/inngest/client';
// After user creation
await inngest.send({
name: 'user/created',
data: {
userId: user.id,
email: user.email,
},
});
// After order placement
await inngest.send({
name: 'order/placed',
data: {
orderId: order.id,
total: order.total,
},
});
// trigger.ts
import { TriggerClient } from '@trigger.dev/sdk';
export const client = new TriggerClient({
id: 'my-app',
apiKey: process.env.TRIGGER_API_KEY!,
});
// jobs/process-order.ts
import { client } from '@/trigger';
import { eventTrigger } from '@trigger.dev/sdk';
import { z } from 'zod';
client.defineJob({
id: 'process-order',
name: 'Process Order',
version: '1.0.0',
trigger: eventTrigger({
name: 'order.created',
schema: z.object({
orderId: z.string(),
items: z.array(z.object({
productId: z.string(),
quantity: z.number(),
})),
}),
}),
run: async (payload, io) => {
// Reserve inventory
await io.runTask('reserve-inventory', async () => {
for (const item of payload.items) {
await inventoryService.reserve(item.productId, item.quantity);
}
});
// Process payment
const payment = await io.runTask('process-payment', async () => {
return paymentService.charge(payload.orderId);
});
// Send confirmation
await io.runTask('send-confirmation', async () => {
await emailService.sendOrderConfirmation(payload.orderId);
});
// Schedule delivery reminder
await io.sendEvent('schedule-reminder', {
name: 'order.reminder',
payload: { orderId: payload.orderId },
deliverAt: new Date(Date.now() + 24 * 60 * 60 * 1000), // 24h later
});
return { success: true, paymentId: payment.id };
},
});
// lib/queue/scheduler.ts
import { reportQueue } from './queues';
export async function setupScheduledJobs() {
// Daily cleanup at 2 AM
await reportQueue.add(
'cleanup',
{ type: 'cleanup' },
{
repeat: {
pattern: '0 2 * * *',
tz: 'America/New_York',
},
jobId: 'daily-cleanup', // Prevent duplicates
}
);
// Weekly digest every Monday at 9 AM
await reportQueue.add(
'weekly-digest',
{ type: 'digest' },
{
repeat: {
pattern: '0 9 * * 1',
tz: 'America/New_York',
},
jobId: 'weekly-digest',
}
);
// Every 5 minutes health check
await reportQueue.add(
'health-check',
{ type: 'health' },
{
repeat: {
every: 5 * 60 * 1000,
},
jobId: 'health-check',
}
);
}
// lib/scheduler.ts
import cron from 'node-cron';
import { createLogger } from '@/lib/logger';
const logger = createLogger('scheduler');
export function startScheduler() {
// Daily cleanup at 2 AM
cron.schedule('0 2 * * *', async () => {
logger.info('Running daily cleanup');
try {
await cleanupService.run();
logger.info('Daily cleanup completed');
} catch (error) {
logger.error('Daily cleanup failed', { error });
}
});
// Every hour - sync external data
cron.schedule('0 * * * *', async () => {
logger.info('Running hourly sync');
await syncService.run();
});
logger.info('Scheduler started');
}
// lib/queue/retry-strategy.ts
import { Job } from 'bullmq';
export function getBackoffDelay(job: Job): number {
const attempt = job.attemptsMade;
// Exponential backoff with jitter
const baseDelay = 1000;
const maxDelay = 30 * 60 * 1000; // 30 minutes
const exponentialDelay = Math.min(baseDelay * Math.pow(2, attempt), maxDelay);
const jitter = Math.random() * 1000;
return exponentialDelay + jitter;
}
// In worker
const worker = new Worker('my-queue', processor, {
connection,
settings: {
backoffStrategy: (attemptsMade) => {
return Math.min(1000 * Math.pow(2, attemptsMade), 30 * 60 * 1000);
},
},
});
// lib/queue/dlq.ts
import { Queue, Worker } from 'bullmq';
import { connection } from './connection';
export const deadLetterQueue = new Queue('dead-letter', { connection });
// Move failed jobs to DLQ after max attempts
export async function handleFailedJob(job: Job, error: Error) {
if (job.attemptsMade >= (job.opts.attempts || 3)) {
await deadLetterQueue.add('failed', {
originalQueue: job.queueName,
originalJobId: job.id,
data: job.data,
error: {
message: error.message,
stack: error.stack,
},
failedAt: new Date().toISOString(),
attempts: job.attemptsMade,
});
}
}
// DLQ processor - for manual review/retry
export const dlqWorker = new Worker(
'dead-letter',
async (job) => {
// Log for alerting
logger.error('Job in dead letter queue', {
originalQueue: job.data.originalQueue,
originalJobId: job.data.originalJobId,
error: job.data.error,
});
// Could send to Sentry, Slack, etc.
await alertService.sendAlert({
type: 'dlq',
job: job.data,
});
},
{ connection }
);
// app/api/admin/queues/route.ts
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { emailQueue, imageQueue, reportQueue } from '@/lib/queue/queues';
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/api/admin/queues');
createBullBoard({
queues: [
new BullMQAdapter(emailQueue),
new BullMQAdapter(imageQueue),
new BullMQAdapter(reportQueue),
],
serverAdapter,
});
export const GET = serverAdapter.getRouter();
export const POST = serverAdapter.getRouter();
// lib/queue/metrics.ts
import { Queue } from 'bullmq';
export async function getQueueMetrics(queue: Queue) {
const [waiting, active, completed, failed, delayed] = await Promise.all([
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getCompletedCount(),
queue.getFailedCount(),
queue.getDelayedCount(),
]);
return {
name: queue.name,
waiting,
active,
completed,
failed,
delayed,
total: waiting + active + completed + failed + delayed,
};
}
// Expose via API
export async function GET() {
const metrics = await Promise.all([
getQueueMetrics(emailQueue),
getQueueMetrics(imageQueue),
getQueueMetrics(reportQueue),
]);
return Response.json(metrics);
}
// Ensure jobs can be safely retried
async function processPayment(job: Job<PaymentJob>) {
const { orderId, amount } = job.data;
// Check if already processed
const existingPayment = await db.payment.findFirst({
where: {
orderId,
idempotencyKey: job.id, // Use job ID as idempotency key
},
});
if (existingPayment) {
return { alreadyProcessed: true, paymentId: existingPayment.id };
}
// Process payment
const payment = await paymentService.charge({
orderId,
amount,
idempotencyKey: job.id,
});
return { paymentId: payment.id };
}
// Set appropriate timeouts
const job = await queue.add('process', data, {
timeout: 30000, // 30 seconds
// Or use worker-level timeout
});
// In worker
const worker = new Worker(
'my-queue',
async (job) => {
// Use AbortController for cleanup
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 25000);
try {
await longRunningTask({ signal: controller.signal });
} finally {
clearTimeout(timeout);
}
},
{ connection }
);
tools
# Test Patterns Testing patterns for reliable, maintainable, and fast tests. > **Template Usage:** Customize for your test framework (Vitest, Jest, Playwright, etc.) and assertion library. ## Test Structure ```typescript // user.test.ts import { describe, it, expect, beforeEach, afterEach } from 'vitest'; import { userService } from '@/services/user.service'; import { createTestUser, cleanupTestData } from '@/tests/helpers'; describe('UserService', () => { let testUserId: string; befor
tools
# State Management Patterns Client-side state management patterns for modern applications. > **Template Usage:** Customize for your state library (React Query, Zustand, Jotai, Redux, etc.). ## State Categories | Type | Description | Solution | |------|-------------|----------| | **Server State** | Data from API/database | React Query, SWR | | **Client State** | UI state, user preferences | Zustand, Jotai, useState | | **Form State** | Form inputs, validation | React Hook Form, Formik | | **U
development
# Service Patterns Service layer patterns for clean architecture with proper error handling, logging, and type safety. > **Template Usage:** Customize for your ORM (Prisma, Drizzle, TypeORM, etc.) and logging solution. ## Result Type Pattern Never throw exceptions from services. Always return a Result type. ```typescript // lib/result.ts export type Result<T, E = Error> = | { success: true; data: T } | { success: false; error: E }; export function ok<T>(data: T): Result<T, never> { r
testing
# Row-Level Security Patterns Database security patterns for multi-tenant and user-scoped data. > **Template Usage:** Customize for your database (PostgreSQL, Supabase, etc.) and auth system. ## RLS Fundamentals ### Enable RLS on Tables ```sql -- Enable RLS (required before policies take effect) ALTER TABLE users ENABLE ROW LEVEL SECURITY; ALTER TABLE posts ENABLE ROW LEVEL SECURITY; ALTER TABLE comments ENABLE ROW LEVEL SECURITY; -- Force RLS for table owners too (recommended) ALTER TABLE