Agent skill
queue
Guide for job queue patterns in multi-agent coordination. Use when deciding between background jobs vs inline execution, submitting long-running tasks, monitoring job progress, and handling failures. Covers when to queue, job priority, retry strategies, and monitoring patterns.
Install this agent skill to your Project
npx add-skill https://github.com/joelhooks/swarm-tools/tree/main/packages/opencode-swarm-plugin/global-skills/queue
SKILL.md
Queue Skill
Reliable job processing for multi-agent workflows using BullMQ and Redis.
When to Use Background Jobs (Queue)
Queue jobs when:
- Long-running operations (>500ms) - Embedding generation, PDF processing, data analysis
- Resource-intensive work - ML inference, image transcoding, complex computations
- Fault tolerance matters - Can fail and retry without blocking the caller
- Scaling needed - Process multiple jobs in parallel with workers
- Async is acceptable - Caller doesn't need immediate result
- Rate limiting required - Control throughput with concurrency settings
- Workflow coordination - Chain tasks or wait for results asynchronously
When NOT to Use Background Jobs
Use inline execution when:
- Sub-100ms operations - Simple data transforms, validation, cache lookups
- Need immediate result - Caller blocks waiting for response
- No failure handling needed - Single request, no retry logic
- Stateless one-shots - No persistence or monitoring required
Job Queue API
Creating a Queue
import { createSwarmQueue } from 'swarm-queue';
const queue = createSwarmQueue({
name: 'embeddings',
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
},
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000, // Start at 2 seconds, exponential backoff
},
removeOnComplete: true, // Clean up completed jobs
},
});
Submitting Jobs
// Basic job
const jobId = await queue.addJob('generate-embedding', {
text: 'Hello, world!',
model: 'text-embedding-3-small',
});
// With priority (0=urgent, 1=high, 2=normal, 3=low)
const jobId = await queue.addJob(
'agent-task',
{ agentId: 'worker-1', task: 'analyze' },
{ priority: 1 } // Process before normal jobs
);
// Delayed job (start processing after delay)
const jobId = await queue.addJob(
'retry-agent',
{ attempt: 2 },
{ delay: 30000 } // Process in 30 seconds
);
// With custom retry strategy
const jobId = await queue.addJob(
'webhook-call',
{ url: 'https://example.com/webhook' },
{
attempts: 5,
backoff: {
type: 'exponential',
delay: 1000, // 1s, 2s, 4s, 8s, 16s
},
}
);
Checking Job Status
// Get job details
const job = await queue.getJob(jobId);
if (job) {
console.log({
state: await job.getState(), // 'waiting' | 'active' | 'completed' | 'failed'
progress: job.progress(), // 0-100
attempts: job.attemptsMade,
failedReason: job.failedReason,
});
}
// Get queue metrics
const metrics = await queue.getMetrics();
console.log(metrics);
// {
// waiting: 42, // Jobs queued, not started
// active: 5, // Jobs being processed
// completed: 1000,// Successfully finished
// failed: 3, // Failed after retries
// delayed: 0 // Delayed jobs
// }
Canceling Jobs
// Remove a job (works from any state)
await queue.removeJob(jobId);
Creating Workers
Workers process jobs from the queue. Start workers in separate processes or services.
import { createWorker } from 'swarm-queue';
const worker = await createWorker(
{
queueName: 'embeddings',
concurrency: 4, // Process 4 jobs in parallel
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
},
},
async (job) => {
try {
const { text, model } = job.data.payload;
// Update progress
job.updateProgress(25);
// Do the work
const embedding = await generateEmbedding(text, model);
job.updateProgress(100);
// Return result
return {
success: true,
data: { embedding, dimensions: embedding.length },
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
}
);
// Start processing
await worker.start();
// Graceful shutdown
process.on('SIGTERM', async () => {
await worker.stop();
await worker.close();
});
Job Priority Patterns
Urgent vs Background
// Urgent: agent error requiring immediate response
await queue.addJob('notify-coordinator', { error }, { priority: 0 });
// High: task completion from other agents
await queue.addJob('merge-results', { results }, { priority: 1 });
// Normal: progress updates
await queue.addJob('update-metrics', { metrics }, { priority: 2 });
// Low: cleanup, archival
await queue.addJob('cleanup-cache', { cacheKey }, { priority: 3 });
Processing Priority in Workers
BullMQ automatically processes higher priority jobs first:
// Queue metrics will show that higher priority jobs are processed first
const metrics = await queue.getMetrics();
console.log(`Urgent jobs waiting: ${metrics.waiting}`);
// Worker will process urgent (priority 0) jobs before normal ones
Failure Handling and Retry Strategies
Exponential Backoff
const jobId = await queue.addJob('api-call', { endpoint: '/data' }, {
attempts: 4,
backoff: {
type: 'exponential',
delay: 1000, // First retry: 1s, then 2s, 4s, 8s
},
});
Retry timeline: 1s → 2s → 4s → 8s → Failed (moves to dead-letter)
Fixed Delay
const jobId = await queue.addJob('webhook', { url }, {
attempts: 3,
backoff: {
type: 'fixed',
delay: 5000, // Always wait 5 seconds between retries
},
});
Dead Letter Pattern
After max retries, jobs fail and are no longer retried:
// Monitor failures
const metrics = await queue.getMetrics();
if (metrics.failed > 0) {
console.warn(`${metrics.failed} jobs have permanently failed`);
// Log to monitoring, alert on-call, etc.
}
Monitoring and Observability
Real-Time Metrics
const metrics = await queue.getMetrics();
const queueHealth = {
throughput: metrics.completed, // Total completed
backlog: metrics.waiting, // Jobs not yet started
inProgress: metrics.active, // Currently being processed
failureRate: metrics.failed / (metrics.completed + metrics.failed),
avgTimeInQueue: null, // Custom tracking needed
};
console.log(`Queue health: ${JSON.stringify(queueHealth, null, 2)}`);
Monitoring Best Practices
- Track completion time: Record when jobs enter and exit the queue
- Failure alerts: Alert when failure rate exceeds threshold
- Backlog warnings: Warn when waiting jobs exceed capacity
- Worker health: Monitor worker process availability
- Job timeouts: Set reasonable timeout expectations per job type
// Example: Monitor queue health
setInterval(async () => {
const metrics = await queue.getMetrics();
const total = Object.values(metrics).reduce((a, b) => a + b);
if (metrics.failed > 0.1 * total) {
console.error('High failure rate detected');
// Alert monitoring system
}
if (metrics.waiting > 1000) {
console.warn('Large backlog detected - consider scaling workers');
}
}, 30000);
Common Job Types
Embedding Generation
// Agent submits: Generate embeddings for documents
const jobId = await queue.addJob('generate-embedding', {
documentId: 'doc-123',
text: 'Full document text here...',
model: 'text-embedding-3-small',
}, {
priority: 2,
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
});
// Worker: Process embedding job
const result = await generateEmbedding(payload.text, payload.model);
PDF Processing
// Agent submits: Extract text from PDF
const jobId = await queue.addJob('process-pdf', {
fileUrl: 'https://example.com/document.pdf',
pages: [1, 2, 3], // Only extract specific pages
}, {
priority: 1,
attempts: 2,
removeOnComplete: true,
});
// Worker: Long-running PDF processing
const text = await extractPdfText(payload.fileUrl, payload.pages);
Bulk Operations
// Agent submits: Process 1000 items in background
const jobId = await queue.addJob('bulk-update', {
items: largeDataset,
operation: 'transform',
}, {
priority: 3, // Low priority, can take time
attempts: 1, // No retry for bulk (resumability is application-specific)
});
// Worker: Stream-based processing
for (const item of payload.items) {
await processItem(item);
job.updateProgress(++processed / payload.items.length * 100);
}
Task Coordination with Queues
Waiting for Results
Since queue jobs are async, use polling or events for results:
// Agent A: Submit job and poll for result
const jobId = await queue.addJob('analyze-data', { data });
// Polling (simple but not ideal for long waits)
let result = null;
const maxAttempts = 60; // 60 * 5sec = 5 minutes
for (let i = 0; i < maxAttempts; i++) {
const job = await queue.getJob(jobId);
const state = await job?.getState();
if (state === 'completed') {
result = job?.returnvalue;
break;
}
await new Promise(r => setTimeout(r, 5000)); // Wait 5s
}
if (result) {
console.log('Analysis complete:', result);
} else {
console.error('Job timeout');
}
Chaining Jobs
// Job 1: Generate embeddings
const jobId1 = await queue.addJob('generate-embedding', { text });
// Job 2: After Job 1 completes, compare embeddings
// This is application-level coordination - worker for Job 1 submits Job 2
// Worker:
if (jobResult.success) {
const jobId2 = await queue.addJob('compare-embeddings', {
embedding: jobResult.data.embedding,
compareWith: otherEmbedding,
});
}
CLI Usage (swarm queue)
The swarm CLI provides queue management commands:
# Submit a job
swarm queue submit embeddings '{"text":"hello","model":"small"}' --priority 1
# Check job status
swarm queue status embeddings job-id-123
# List queue contents
swarm queue list embeddings --state waiting --limit 10
swarm queue list embeddings --state failed
# Start a worker
swarm worker embeddings --concurrency 4
# Cleanup old jobs
swarm queue cleanup embeddings --before 7d
Error Handling Strategies
Graceful Degradation
// If job fails, provide fallback
const jobId = await queue.addJob('expensive-compute', { data });
try {
const job = await queue.getJob(jobId);
const state = await job?.getState();
if (state === 'failed') {
// Use fallback result
return getCachedResult(data) || getDefaultResult();
}
} catch (error) {
// Queue unavailable - use fallback
return getCachedResult(data);
}
Dead Letter Handling
// Monitor failed jobs and move to dead-letter handling
const metrics = await queue.getMetrics();
if (metrics.failed > 0) {
const failedJobs = []; // Fetch failed jobs from Redis
// Send to dead-letter queue for manual review or retrigger
for (const job of failedJobs) {
console.error(`Failed job ${job.id}: ${job.failedReason}`);
// Log to monitoring system
// Alert on-call engineer
}
}
Performance Tuning
Concurrency Settings
// Low concurrency: Process fewer jobs in parallel (less resource use)
const worker = await createWorker(
{
queueName: 'embeddings',
concurrency: 1, // One at a time (good for expensive ops)
},
processor
);
// High concurrency: More parallel processing (higher throughput)
const worker = await createWorker(
{
queueName: 'simple-tasks',
concurrency: 16, // Max parallel (good for I/O-bound ops)
},
processor
);
Job Options for Performance
// Remove completed jobs to save Redis memory
await queue.addJob('cleanup', { data }, {
removeOnComplete: true, // Don't keep history
attempts: 1, // No retry needed
});
// Keep important jobs for auditing
await queue.addJob('agent-task', { data }, {
removeOnComplete: false, // Retain history
attempts: 3, // Retry on failure
});
Testing
Unit Testing Jobs
import { describe, test, expect } from 'bun:test';
describe('Queue Jobs', () => {
test('embedding job returns valid embedding', async () => {
const processor = async (job) => {
// Mock job for testing
return {
success: true,
data: { embedding: [0.1, 0.2, 0.3] },
};
};
const result = await processor({
data: { payload: { text: 'hello', model: 'small' } },
});
expect(result.success).toBe(true);
expect(result.data.embedding).toHaveLength(3);
});
});
Integration Testing with Real Queue
test('job completes successfully in queue', async () => {
const queue = createSwarmQueue({ name: 'test-queue' });
const jobId = await queue.addJob('test-job', { data: 'test' });
const job = await queue.getJob(jobId);
expect(job).toBeDefined();
// Clean up
await queue.removeJob(jobId);
await queue.close();
});
Summary
- Use queues for long-running, fault-tolerant work
- Set appropriate priority and retry strategies
- Monitor queue health and failure rates
- Chain jobs at the application level
- Gracefully handle job failures with fallbacks
- Scale workers independently from the API
Recommended Agent Skills
Expand your agent's capabilities with these related and highly-rated skills.
swarm-coordination
Multi-agent coordination patterns for OpenCode swarm workflows. Use when work benefits from parallelization or coordination. Covers: decomposition, worker spawning, file reservations, progress tracking, and review loops.
swarm-cli
Swarm CLI commands for workers - hivemind memory, hive tasks, swarmmail coordination. Use when working in a swarm context. Covers: swarm memory (find/store/get/stats), swarm cells (query/create/update/close), and coordination commands.
ralph-supervisor
Ralph loop pattern - Claude supervises while Codex (gpt-5.3-codex) executes implementation work. Use for autonomous coding loops with fresh context per iteration, validation gates, and git-backed persistence. Tools: ralph_init, ralph_story, ralph_iterate, ralph_loop, ralph_status, ralph_cancel, ralph_review.
always-on-guidance
Always-on rule-oriented guidance for claude-plugin agents. Use to align behavior, tool usage, and model-specific defaults while avoiding deprecated bd/cass references. Related skills: swarm-coordination, testing-patterns.
swarm-coordination
Multi-agent coordination patterns for OpenCode swarm workflows. Use when working on complex tasks that benefit from parallelization, when coordinating multiple agents, or when managing task decomposition. Do NOT use for simple single-agent tasks.
hive-workflow
Issue tracking and task management using the hive system. Use when creating, updating, or managing work items. Use when you need to track bugs, features, tasks, or epics. Do NOT use for simple one-off questions or explorations.
Didn't find tool you were looking for?