Agent skill

RabbitMQ

Use RabbitMQ for reliable message queuing, pub/sub messaging, and task distribution with multiple exchange types.

Stars 10
Forks 1

Install this agent skill to your Project

npx add-skill https://github.com/hivellm/rulebook/tree/main/templates/skills/services/rabbitmq

SKILL.md

RabbitMQ Message Queue Instructions

CRITICAL: Use RabbitMQ for reliable message queuing, pub/sub messaging, and task distribution with multiple exchange types.

Core Features

Connection

typescript
// Using amqplib
import amqp from 'amqplib'

const connection = await amqp.connect(process.env.RABBITMQ_URL || 'amqp://localhost:5672')
const channel = await connection.createChannel()

// Connection with options
const connection = await amqp.connect({
  protocol: 'amqp',
  hostname: process.env.RABBITMQ_HOST || 'localhost',
  port: parseInt(process.env.RABBITMQ_PORT || '5672'),
  username: process.env.RABBITMQ_USER || 'guest',
  password: process.env.RABBITMQ_PASSWORD || 'guest',
  vhost: process.env.RABBITMQ_VHOST || '/',
})

Basic Operations

typescript
// Declare queue
await channel.assertQueue('tasks', {
  durable: true, // Survive broker restart
})

// Send message
channel.sendToQueue('tasks', Buffer.from(JSON.stringify({ task: 'process' })), {
  persistent: true, // Survive broker restart
})

// Consume messages
await channel.consume('tasks', (msg) => {
  if (msg) {
    const content = JSON.parse(msg.content.toString())
    console.log('Received:', content)
    
    // Process message
    processTask(content)
    
    // Acknowledge
    channel.ack(msg)
  }
}, {
  noAck: false, // Manual acknowledgment
})

Exchange Types

typescript
// Direct exchange
await channel.assertExchange('logs', 'direct', { durable: true })
await channel.bindQueue('queue', 'logs', 'error')
await channel.publish('logs', 'error', Buffer.from('Error message'))

// Topic exchange
await channel.assertExchange('events', 'topic', { durable: true })
await channel.bindQueue('queue', 'events', 'user.*.created')
await channel.publish('events', 'user.123.created', Buffer.from(JSON.stringify(data)))

// Fanout exchange (broadcast)
await channel.assertExchange('notifications', 'fanout', { durable: true })
await channel.bindQueue('queue1', 'notifications', '')
await channel.bindQueue('queue2', 'notifications', '')
await channel.publish('notifications', '', Buffer.from('Broadcast message'))

// Headers exchange
await channel.assertExchange('headers_exchange', 'headers', { durable: true })
await channel.bindQueue('queue', 'headers_exchange', '', {
  'x-match': 'all',
  type: 'notification',
  priority: 'high',
})
await channel.publish('headers_exchange', '', Buffer.from('Message'), {
  headers: { type: 'notification', priority: 'high' },
})

Common Patterns

Work Queue (Task Distribution)

typescript
// Producer
async function publishTask(task: any) {
  await channel.assertQueue('tasks', { durable: true })
  channel.sendToQueue('tasks', Buffer.from(JSON.stringify(task)), {
    persistent: true,
  })
}

// Consumer
async function consumeTasks() {
  await channel.assertQueue('tasks', { durable: true })
  channel.prefetch(1) // Process one message at a time
  
  await channel.consume('tasks', async (msg) => {
    if (msg) {
      try {
        const task = JSON.parse(msg.content.toString())
        await processTask(task)
        channel.ack(msg)
      } catch (error) {
        // Reject and requeue
        channel.nack(msg, false, true)
      }
    }
  })
}

Pub/Sub

typescript
// Publisher
async function publishEvent(eventType: string, data: any) {
  await channel.assertExchange('events', 'topic', { durable: true })
  channel.publish('events', eventType, Buffer.from(JSON.stringify(data)), {
    persistent: true,
  })
}

// Subscriber
async function subscribeToEvents(routingKey: string, handler: (data: any) => void) {
  await channel.assertExchange('events', 'topic', { durable: true })
  const queue = await channel.assertQueue('', { exclusive: true })
  await channel.bindQueue(queue.queue, 'events', routingKey)
  
  await channel.consume(queue.queue, (msg) => {
    if (msg) {
      const data = JSON.parse(msg.content.toString())
      handler(data)
      channel.ack(msg)
    }
  })
}

RPC Pattern

typescript
// RPC Server
async function setupRPCServer() {
  await channel.assertQueue('rpc_queue', { durable: false })
  channel.prefetch(1)
  
  await channel.consume('rpc_queue', async (msg) => {
    if (msg) {
      const request = JSON.parse(msg.content.toString())
      const response = await processRequest(request)
      
      channel.sendToQueue(
        msg.properties.replyTo,
        Buffer.from(JSON.stringify(response)),
        {
          correlationId: msg.properties.correlationId,
        }
      )
      
      channel.ack(msg)
    }
  })
}

// RPC Client
async function rpcCall(request: any): Promise<any> {
  const queue = await channel.assertQueue('', { exclusive: true })
  const correlationId = generateUuid()
  
  return new Promise((resolve, reject) => {
    const timeout = setTimeout(() => {
      channel.deleteQueue(queue.queue)
      reject(new Error('RPC timeout'))
    }, 10000)
    
    channel.consume(queue.queue, (msg) => {
      if (msg && msg.properties.correlationId === correlationId) {
        clearTimeout(timeout)
        const response = JSON.parse(msg.content.toString())
        channel.deleteQueue(queue.queue)
        resolve(response)
      }
    }, { noAck: true })
    
    channel.sendToQueue('rpc_queue', Buffer.from(JSON.stringify(request)), {
      correlationId,
      replyTo: queue.queue,
    })
  })
}

Best Practices

DO:

  • Use durable queues and exchanges for important messages
  • Set message persistence for critical messages
  • Use manual acknowledgment (noAck: false)
  • Implement proper error handling
  • Use prefetch to control message distribution
  • Monitor queue lengths
  • Use dead letter exchanges for failed messages
  • Implement connection retry logic
  • Use appropriate exchange types
  • Set message TTL when needed

DON'T:

  • Skip acknowledgment (lose messages on crash)
  • Use auto-ack for critical messages
  • Ignore connection errors
  • Hardcode connection strings
  • Store large messages (use external storage)
  • Skip error handling
  • Ignore queue monitoring
  • Use non-durable queues for important data
  • Skip message persistence
  • Ignore memory limits

Configuration

Environment Variables

bash
RABBITMQ_URL=amqp://localhost:5672
RABBITMQ_URL=amqp://user:password@host:5672/vhost
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_VHOST=/

Docker Compose

yaml
services:
  rabbitmq:
    image: rabbitmq:3-management-alpine
    ports:
      - "5672:5672"   # AMQP
      - "15672:15672" # Management UI
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: securepassword
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

volumes:
  rabbitmq_data:

Integration with Development

Testing

typescript
// Use test connection
const testConnection = await amqp.connect('amqp://localhost:5673') // Different port

// Clean up after tests
afterEach(async () => {
  // Delete test queues/exchanges or use separate vhost
})

Health Checks

typescript
async function checkRabbitMQHealth(): Promise<boolean> {
  try {
    const connection = await amqp.connect(process.env.RABBITMQ_URL)
    await connection.close()
    return true
  } catch {
    return false
  }
}

Didn't find tool you were looking for?

Be as detailed as possible for better results