packages/opencode-swarm-plugin/global-skills/queue/SKILL.md
Guide for job queue patterns in multi-agent coordination. Use when deciding between background jobs vs inline execution, submitting long-running tasks, monitoring job progress, and handling failures. Covers when to queue, job priority, retry strategies, and monitoring patterns.
npx skillsauth add joelhooks/swarm-tools queueInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Reliable job processing for multi-agent workflows using BullMQ and Redis.
Queue jobs when:
Use inline execution when:
import { createSwarmQueue } from 'swarm-queue';
const queue = createSwarmQueue({
name: 'embeddings',
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
},
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000, // Start at 2 seconds, exponential backoff
},
removeOnComplete: true, // Clean up completed jobs
},
});
// Basic job
const jobId = await queue.addJob('generate-embedding', {
text: 'Hello, world!',
model: 'text-embedding-3-small',
});
// With priority (0=urgent, 1=high, 2=normal, 3=low)
const jobId = await queue.addJob(
'agent-task',
{ agentId: 'worker-1', task: 'analyze' },
{ priority: 1 } // Process before normal jobs
);
// Delayed job (start processing after delay)
const jobId = await queue.addJob(
'retry-agent',
{ attempt: 2 },
{ delay: 30000 } // Process in 30 seconds
);
// With custom retry strategy
const jobId = await queue.addJob(
'webhook-call',
{ url: 'https://example.com/webhook' },
{
attempts: 5,
backoff: {
type: 'exponential',
delay: 1000, // 1s, 2s, 4s, 8s, 16s
},
}
);
// Get job details
const job = await queue.getJob(jobId);
if (job) {
console.log({
state: await job.getState(), // 'waiting' | 'active' | 'completed' | 'failed'
progress: job.progress(), // 0-100
attempts: job.attemptsMade,
failedReason: job.failedReason,
});
}
// Get queue metrics
const metrics = await queue.getMetrics();
console.log(metrics);
// {
// waiting: 42, // Jobs queued, not started
// active: 5, // Jobs being processed
// completed: 1000,// Successfully finished
// failed: 3, // Failed after retries
// delayed: 0 // Delayed jobs
// }
// Remove a job (works from any state)
await queue.removeJob(jobId);
Workers process jobs from the queue. Start workers in separate processes or services.
import { createWorker } from 'swarm-queue';
const worker = await createWorker(
{
queueName: 'embeddings',
concurrency: 4, // Process 4 jobs in parallel
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
},
},
async (job) => {
try {
const { text, model } = job.data.payload;
// Update progress
job.updateProgress(25);
// Do the work
const embedding = await generateEmbedding(text, model);
job.updateProgress(100);
// Return result
return {
success: true,
data: { embedding, dimensions: embedding.length },
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
}
);
// Start processing
await worker.start();
// Graceful shutdown
process.on('SIGTERM', async () => {
await worker.stop();
await worker.close();
});
// Urgent: agent error requiring immediate response
await queue.addJob('notify-coordinator', { error }, { priority: 0 });
// High: task completion from other agents
await queue.addJob('merge-results', { results }, { priority: 1 });
// Normal: progress updates
await queue.addJob('update-metrics', { metrics }, { priority: 2 });
// Low: cleanup, archival
await queue.addJob('cleanup-cache', { cacheKey }, { priority: 3 });
BullMQ automatically processes higher priority jobs first:
// Queue metrics will show that higher priority jobs are processed first
const metrics = await queue.getMetrics();
console.log(`Urgent jobs waiting: ${metrics.waiting}`);
// Worker will process urgent (priority 0) jobs before normal ones
const jobId = await queue.addJob('api-call', { endpoint: '/data' }, {
attempts: 4,
backoff: {
type: 'exponential',
delay: 1000, // First retry: 1s, then 2s, 4s, 8s
},
});
Retry timeline: 1s → 2s → 4s → 8s → Failed (moves to dead-letter)
const jobId = await queue.addJob('webhook', { url }, {
attempts: 3,
backoff: {
type: 'fixed',
delay: 5000, // Always wait 5 seconds between retries
},
});
After max retries, jobs fail and are no longer retried:
// Monitor failures
const metrics = await queue.getMetrics();
if (metrics.failed > 0) {
console.warn(`${metrics.failed} jobs have permanently failed`);
// Log to monitoring, alert on-call, etc.
}
const metrics = await queue.getMetrics();
const queueHealth = {
throughput: metrics.completed, // Total completed
backlog: metrics.waiting, // Jobs not yet started
inProgress: metrics.active, // Currently being processed
failureRate: metrics.failed / (metrics.completed + metrics.failed),
avgTimeInQueue: null, // Custom tracking needed
};
console.log(`Queue health: ${JSON.stringify(queueHealth, null, 2)}`);
// Example: Monitor queue health
setInterval(async () => {
const metrics = await queue.getMetrics();
const total = Object.values(metrics).reduce((a, b) => a + b);
if (metrics.failed > 0.1 * total) {
console.error('High failure rate detected');
// Alert monitoring system
}
if (metrics.waiting > 1000) {
console.warn('Large backlog detected - consider scaling workers');
}
}, 30000);
// Agent submits: Generate embeddings for documents
const jobId = await queue.addJob('generate-embedding', {
documentId: 'doc-123',
text: 'Full document text here...',
model: 'text-embedding-3-small',
}, {
priority: 2,
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
});
// Worker: Process embedding job
const result = await generateEmbedding(payload.text, payload.model);
// Agent submits: Extract text from PDF
const jobId = await queue.addJob('process-pdf', {
fileUrl: 'https://example.com/document.pdf',
pages: [1, 2, 3], // Only extract specific pages
}, {
priority: 1,
attempts: 2,
removeOnComplete: true,
});
// Worker: Long-running PDF processing
const text = await extractPdfText(payload.fileUrl, payload.pages);
// Agent submits: Process 1000 items in background
const jobId = await queue.addJob('bulk-update', {
items: largeDataset,
operation: 'transform',
}, {
priority: 3, // Low priority, can take time
attempts: 1, // No retry for bulk (resumability is application-specific)
});
// Worker: Stream-based processing
for (const item of payload.items) {
await processItem(item);
job.updateProgress(++processed / payload.items.length * 100);
}
Since queue jobs are async, use polling or events for results:
// Agent A: Submit job and poll for result
const jobId = await queue.addJob('analyze-data', { data });
// Polling (simple but not ideal for long waits)
let result = null;
const maxAttempts = 60; // 60 * 5sec = 5 minutes
for (let i = 0; i < maxAttempts; i++) {
const job = await queue.getJob(jobId);
const state = await job?.getState();
if (state === 'completed') {
result = job?.returnvalue;
break;
}
await new Promise(r => setTimeout(r, 5000)); // Wait 5s
}
if (result) {
console.log('Analysis complete:', result);
} else {
console.error('Job timeout');
}
// Job 1: Generate embeddings
const jobId1 = await queue.addJob('generate-embedding', { text });
// Job 2: After Job 1 completes, compare embeddings
// This is application-level coordination - worker for Job 1 submits Job 2
// Worker:
if (jobResult.success) {
const jobId2 = await queue.addJob('compare-embeddings', {
embedding: jobResult.data.embedding,
compareWith: otherEmbedding,
});
}
The swarm CLI provides queue management commands:
# Submit a job
swarm queue submit embeddings '{"text":"hello","model":"small"}' --priority 1
# Check job status
swarm queue status embeddings job-id-123
# List queue contents
swarm queue list embeddings --state waiting --limit 10
swarm queue list embeddings --state failed
# Start a worker
swarm worker embeddings --concurrency 4
# Cleanup old jobs
swarm queue cleanup embeddings --before 7d
// If job fails, provide fallback
const jobId = await queue.addJob('expensive-compute', { data });
try {
const job = await queue.getJob(jobId);
const state = await job?.getState();
if (state === 'failed') {
// Use fallback result
return getCachedResult(data) || getDefaultResult();
}
} catch (error) {
// Queue unavailable - use fallback
return getCachedResult(data);
}
// Monitor failed jobs and move to dead-letter handling
const metrics = await queue.getMetrics();
if (metrics.failed > 0) {
const failedJobs = []; // Fetch failed jobs from Redis
// Send to dead-letter queue for manual review or retrigger
for (const job of failedJobs) {
console.error(`Failed job ${job.id}: ${job.failedReason}`);
// Log to monitoring system
// Alert on-call engineer
}
}
// Low concurrency: Process fewer jobs in parallel (less resource use)
const worker = await createWorker(
{
queueName: 'embeddings',
concurrency: 1, // One at a time (good for expensive ops)
},
processor
);
// High concurrency: More parallel processing (higher throughput)
const worker = await createWorker(
{
queueName: 'simple-tasks',
concurrency: 16, // Max parallel (good for I/O-bound ops)
},
processor
);
// Remove completed jobs to save Redis memory
await queue.addJob('cleanup', { data }, {
removeOnComplete: true, // Don't keep history
attempts: 1, // No retry needed
});
// Keep important jobs for auditing
await queue.addJob('agent-task', { data }, {
removeOnComplete: false, // Retain history
attempts: 3, // Retry on failure
});
import { describe, test, expect } from 'bun:test';
describe('Queue Jobs', () => {
test('embedding job returns valid embedding', async () => {
const processor = async (job) => {
// Mock job for testing
return {
success: true,
data: { embedding: [0.1, 0.2, 0.3] },
};
};
const result = await processor({
data: { payload: { text: 'hello', model: 'small' } },
});
expect(result.success).toBe(true);
expect(result.data.embedding).toHaveLength(3);
});
});
test('job completes successfully in queue', async () => {
const queue = createSwarmQueue({ name: 'test-queue' });
const jobId = await queue.addJob('test-job', { data: 'test' });
const job = await queue.getJob(jobId);
expect(job).toBeDefined();
// Clean up
await queue.removeJob(jobId);
await queue.close();
});
development
Patterns for testing code effectively. Use when breaking dependencies for testability, adding tests to existing code, understanding unfamiliar code through characterization tests, or deciding how to structure tests. Covers seams, dependency injection, test doubles, and safe refactoring techniques from Michael Feathers.
tools
Principles for building reusable coding systems. Use when designing modules, APIs, CLIs, or any code meant to be used by others. Based on "A Philosophy of Software Design" by John Ousterhout. Covers deep modules, complexity management, and design red flags.
development
Multi-agent coordination patterns for OpenCode swarm workflows. Use when working on complex tasks that benefit from parallelization, when coordinating multiple agents, or when managing task decomposition. Do NOT use for simple single-agent tasks.
development
Meta-skill for generating new skills with proper format and structure. Use when creating new skills for the swarm system or when agents need to generate skill scaffolds. Ensures skills follow conventions (frontmatter format, directory structure, bundled resources).