Agent skill

rabbitmq-master

Ultimate RabbitMQ expertise skill for production-grade message broker architecture, implementation, and operations. Top 0.01% knowledge covering: (1) Advanced messaging patterns - Dead Letter Exchanges, Delayed Messages, Priority Queues, Consistent Hash Exchange, Sharding, (2) High Availability - Clustering, Quorum Queues, Stream Queues, Federation, Shovel, (3) Performance Engineering - prefetch tuning, connection pooling, batch publishing, memory optimization, flow control, (4) Security - TLS/mTLS, OAuth2, LDAP, certificate rotation, (5) Monitoring - Prometheus metrics, custom health checks, anomaly detection, (6) Troubleshooting - memory alarms, network partitions, queue backlogs, consumer starvation, (7) Multi-tenancy - vhost design, resource limits, isolation patterns, (8) Event-driven architectures - CQRS, Event Sourcing, Saga patterns with RabbitMQ. Use when: building messaging systems, debugging RabbitMQ issues, optimizing performance, designing HA architectures, implementing advanced patterns, production hardening, capacity planning, migration strategies.

Stars 4
Forks 0

Install this agent skill to your Project

npx add-skill https://github.com/mOdrA40/claude-codex-skills-directory/tree/main/data-skills/rabbitmq-mastery-skill

SKILL.md

RabbitMQ Master Skill

Expert-level RabbitMQ knowledge for building bulletproof messaging systems.

Quick Reference

Connection Best Practices

python
# WRONG - Connection per message (kills performance)
def send_bad(msg):
    conn = pika.BlockingConnection(params)  # 7-way TCP handshake + AMQP handshake
    ch = conn.channel()
    ch.basic_publish(...)
    conn.close()

# CORRECT - Connection pooling with heartbeat
import pika
from pika import ConnectionParameters, PlainCredentials

params = ConnectionParameters(
    host='rabbitmq.prod',
    port=5672,
    credentials=PlainCredentials('user', 'pass'),
    heartbeat=60,                    # Detect dead connections
    blocked_connection_timeout=300,  # Handle flow control
    connection_attempts=3,
    retry_delay=5,
    socket_timeout=10,
    stack_timeout=15,
    # CRITICAL: TCP keepalive untuk cloud/NAT environments
    tcp_options={'TCP_KEEPIDLE': 60, 'TCP_KEEPINTVL': 10, 'TCP_KEEPCNT': 3}
)

# Use connection pool - see scripts/connection_pool.py

Channel Best Practices

python
# Channels are NOT thread-safe - use 1 channel per thread
# Channels are cheap - create many, but not per message

# OPTIMAL: Dedicated channels per purpose
publish_channel = conn.channel()
publish_channel.confirm_delivery()  # Enable publisher confirms

consume_channel = conn.channel()
consume_channel.basic_qos(prefetch_count=50)  # Tuned prefetch

Core Patterns

1. Reliable Publishing (Publisher Confirms)

python
# Synchronous confirms (simple, slower)
channel.confirm_delivery()
try:
    channel.basic_publish(
        exchange='orders',
        routing_key='new',
        body=json.dumps(order),
        properties=pika.BasicProperties(
            delivery_mode=2,           # Persistent
            content_type='application/json',
            message_id=str(uuid4()),   # Idempotency key
            timestamp=int(time.time()),
            headers={'retry_count': 0}
        ),
        mandatory=True  # Return if unroutable
    )
except pika.exceptions.UnroutableError:
    handle_unroutable()
except pika.exceptions.NackError:
    handle_nack()

# Asynchronous confirms (complex, 10x faster) - see scripts/async_publisher.py

2. Reliable Consuming

python
def callback(ch, method, properties, body):
    try:
        # ALWAYS process idempotently using message_id
        if is_duplicate(properties.message_id):
            ch.basic_ack(method.delivery_tag)
            return
        
        process_message(body)
        mark_processed(properties.message_id)
        ch.basic_ack(method.delivery_tag)
        
    except RecoverableError as e:
        # Requeue with exponential backoff via DLX
        retry_count = (properties.headers or {}).get('retry_count', 0)
        if retry_count < MAX_RETRIES:
            republish_with_delay(ch, body, retry_count + 1)
            ch.basic_ack(method.delivery_tag)  # Ack original
        else:
            ch.basic_nack(method.delivery_tag, requeue=False)  # To DLQ
            
    except FatalError:
        # Permanent failure - dead letter immediately
        ch.basic_nack(method.delivery_tag, requeue=False)

channel.basic_qos(prefetch_count=50)  # CRITICAL - tune this!
channel.basic_consume(queue='orders', on_message_callback=callback)

3. Dead Letter Exchange Pattern

python
# DLX captures: rejected, expired, queue-full messages
channel.exchange_declare('dlx.exchange', 'direct', durable=True)
channel.queue_declare('dlq.orders', durable=True)
channel.queue_bind('dlq.orders', 'dlx.exchange', 'orders')

# Main queue with DLX
channel.queue_declare(
    'orders',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'dlx.exchange',
        'x-dead-letter-routing-key': 'orders',
        'x-message-ttl': 86400000,     # 24h max age
        'x-max-length': 1000000,       # Max 1M messages
        'x-overflow': 'reject-publish-dlx'  # DLX on overflow
    }
)

4. Delayed/Scheduled Messages

python
# Method 1: Plugin (rabbitmq_delayed_message_exchange)
channel.exchange_declare(
    'delayed.exchange',
    'x-delayed-message',
    arguments={'x-delayed-type': 'direct'}
)

channel.basic_publish(
    exchange='delayed.exchange',
    routing_key='scheduled',
    body=payload,
    properties=pika.BasicProperties(
        headers={'x-delay': 60000}  # 60 seconds delay
    )
)

# Method 2: TTL + DLX chain (no plugin needed) - see references/patterns.md

5. Priority Queues

python
# CAUTION: Priority queues have overhead, use sparingly
channel.queue_declare(
    'priority.orders',
    durable=True,
    arguments={
        'x-max-priority': 10,  # 1-10 priorities, keep low!
        'x-queue-type': 'classic'  # Not supported on quorum
    }
)

# Publishing with priority
channel.basic_publish(
    exchange='',
    routing_key='priority.orders',
    body=payload,
    properties=pika.BasicProperties(
        delivery_mode=2,
        priority=8  # Higher = more important
    )
)

High Availability

Quorum Queues (Recommended for HA)

python
# Raft-based replication - ALWAYS use for critical queues
channel.queue_declare(
    'orders.quorum',
    durable=True,
    arguments={
        'x-queue-type': 'quorum',
        'x-quorum-initial-group-size': 3,  # Replicas
        'x-delivery-limit': 5,             # Auto-DLQ after 5 redeliveries
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-strategy': 'at-least-once'  # Safe DLQ
    }
)

Stream Queues (High-throughput, replay)

python
# Kafka-like streams in RabbitMQ 3.9+
channel.queue_declare(
    'events.stream',
    durable=True,
    arguments={
        'x-queue-type': 'stream',
        'x-max-length-bytes': 20_000_000_000,  # 20GB retention
        'x-max-age': '7D',                      # 7 days retention
        'x-stream-max-segment-size-bytes': 500_000_000
    }
)

# Consuming from offset
channel.basic_qos(prefetch_count=100)
channel.basic_consume(
    'events.stream',
    callback,
    arguments={
        'x-stream-offset': 'first'  # first|last|next|timestamp|offset
    }
)

Performance Tuning

Prefetch Optimization Formula

optimal_prefetch = (avg_processing_time_ms / avg_network_rtt_ms) * consumer_count * 1.5

Examples:
- Same datacenter (1ms RTT), 50ms processing, 1 consumer: (50/1) * 1 * 1.5 = 75
- Cross-region (50ms RTT), 50ms processing, 1 consumer: (50/50) * 1 * 1.5 = 2
- Batch processing (500ms), local: (500/1) * 1 * 1.5 = 750

Batch Publishing (10x throughput)

python
# Single publish: ~2000 msg/s
# Batch publish: ~20000+ msg/s

def batch_publish(channel, messages, batch_size=100):
    channel.confirm_delivery()
    
    for i in range(0, len(messages), batch_size):
        batch = messages[i:i+batch_size]
        for msg in batch:
            channel.basic_publish(
                exchange='batch.exchange',
                routing_key=msg['key'],
                body=msg['body'],
                properties=pika.BasicProperties(delivery_mode=2)
            )
        # Confirm entire batch
        channel.wait_for_confirms(timeout=30)

Memory Management

erlang
%% rabbitmq.conf - Production settings
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark_paging_ratio = 0.75
disk_free_limit.absolute = 5GB

%% Queue memory limits
queue_index_embed_msgs_below = 4096
lazy_queue_explicit_gc_run_operation_threshold = 1000

%% Flow control tuning
credit_flow_default_credit = {400, 200}

Monitoring & Alerting

Critical Metrics

yaml
# Prometheus alerts - see references/monitoring.md for full config
- alert: RabbitMQHighMemory
  expr: rabbitmq_process_resident_memory_bytes / rabbitmq_resident_memory_limit_bytes > 0.8
  
- alert: RabbitMQQueueBacklog
  expr: rabbitmq_queue_messages_ready > 100000
  
- alert: RabbitMQConsumerUtilization
  expr: rabbitmq_queue_consumer_utilisation < 0.5  # Consumers idle = problem
  
- alert: RabbitMQUnackedMessages
  expr: rabbitmq_queue_messages_unacknowledged > 10000
  
- alert: RabbitMQDiskAlarm
  expr: rabbitmq_alarms_free_disk_space_watermark == 1

Health Check Script

bash
# See scripts/health_check.sh for complete implementation
rabbitmqctl node_health_check
rabbitmqctl cluster_status
rabbitmq-diagnostics check_port_connectivity
rabbitmq-diagnostics check_running
rabbitmq-diagnostics check_local_alarms

Anti-Patterns to Avoid

Anti-Pattern Problem Solution
Connection per message 1000x overhead Connection pool
No prefetch (unlimited) Memory explosion Tune prefetch_count
auto_ack=True Message loss Manual ack after processing
Classic queues for HA Split-brain risk Use Quorum queues
Polling with basic_get CPU waste, latency Use basic_consume
Giant messages (>128KB) Memory pressure External storage + reference
No message TTL Queue bloat Set x-message-ttl
Unbounded queue growth Disk/memory full x-max-length + overflow policy

File Reference

  • scripts/connection_pool.py - Production-grade connection pooling
  • scripts/async_publisher.py - High-throughput async publisher with confirms
  • scripts/consumer_template.py - Robust consumer with retry logic
  • scripts/health_check.sh - Comprehensive health check script
  • scripts/queue_migrate.py - Zero-downtime queue migration tool
  • scripts/dlq_processor.py - Dead letter queue reprocessing
  • references/patterns.md - Advanced messaging patterns deep-dive
  • references/clustering.md - HA clustering configuration
  • references/security.md - Security hardening guide
  • references/monitoring.md - Full monitoring setup
  • references/troubleshooting.md - Problem diagnosis guide
  • references/performance.md - Performance tuning deep-dive
  • assets/rabbitmq.conf - Production configuration template
  • assets/docker-compose.yml - Development cluster setup
  • assets/k8s/ - Kubernetes deployment manifests

Expand your agent's capabilities with these related and highly-rated skills.

mOdrA40/claude-codex-skills-directory

nuxt-tanstack-mastery

Panduan senior/lead developer 20 tahun pengalaman untuk Vue.js 3 + Nuxt 3 + TanStack Query development. Gunakan skill ini ketika: (1) Membuat project Nuxt 3 baru dengan arsitektur production-ready, (2) Integrasi TanStack Query untuk data fetching, (3) Debugging Vue/Nuxt yang kompleks, (4) Review code untuk clean code compliance, (5) Optimisasi performa aplikasi Vue/Nuxt, (6) Setup folder structure yang scalable, (7) Mencari library terpercaya untuk Vue ecosystem, (8) Menghindari common pitfalls dan bugs, (9) Implementasi state management patterns, (10) Security hardening aplikasi Nuxt. Trigger keywords: vue, vuejs, nuxt, nuxtjs, tanstack, vue-query, composition api, pinia, vueuse, vue router, clean code vue, debugging vue, folder structure nuxt.

4 0
Explore
mOdrA40/claude-codex-skills-directory

solidjs-solidstart-expert

Expert-level SolidJS and SolidStart development skill with 20+ years senior/lead engineer mindset. Comprehensive guidance for building production-ready, scalable web applications with fine-grained reactivity. Use when Claude needs to: (1) Create new SolidJS/SolidStart projects, (2) Implement TanStack Query/Router/Table/Form integration, (3) Build reactive components with signals/stores/resources, (4) Handle SSR/SSG/streaming with SolidStart, (5) Implement authentication and API routes, (6) Optimize bundle size and performance, (7) Debug reactivity issues and memory leaks, (8) Structure large-scale applications, (9) Implement type-safe patterns with TypeScript, (10) Handle error boundaries and suspense, (11) Build accessible UI components, (12) Deploy to Vercel/Netlify/Cloudflare. Triggers: "solid", "solidjs", "solidstart", "createSignal", "createStore", "createResource", "tanstack solid", "vinxi", "fine-grained reactivity".

4 0
Explore
mOdrA40/claude-codex-skills-directory

react-tanstack-senior

Expertise senior/lead React developer 20 tahun dengan TanStack ecosystem (Query, Router, Table, Form, Start). Gunakan skill ini ketika: (1) Membuat aplikasi React dengan TanStack libraries, (2) Review/refactor kode React untuk clean code, (3) Debugging React/TanStack issues, (4) Setup project structure yang maintainable, (5) Optimasi performa React apps, (6) Memilih library yang tepat untuk use case tertentu, (7) Mencegah common bugs dan memory leaks, (8) Implementasi best practices KISS dan less is more. Trigger keywords: React, TanStack, React Query, TanStack Router, TanStack Table, TanStack Form, TanStack Start, Vinxi, clean code, refactor, performance, debugging.

4 0
Explore
mOdrA40/claude-codex-skills-directory

clickhouse-principal-engineer

Principal/Senior-level ClickHouse playbook for analytical schema design, partitioning, ingestion, query performance, replication, storage strategy, and operating large-scale columnar systems. Use when: designing OLAP workloads, reviewing MergeTree layout, tuning analytical queries, building event analytics platforms, or operating ClickHouse in production.

4 0
Explore
mOdrA40/claude-codex-skills-directory

mysql-principal-engineer

Principal/Senior-level MySQL playbook for schema design, indexing, transactions, replication, operational reliability, online migrations, and production workload tuning. Use when: designing relational systems, reviewing query/index strategy, operating MySQL fleets, debugging contention or replication lag, or hardening MySQL-backed applications.

4 0
Explore
mOdrA40/claude-codex-skills-directory

mongodb-principal-engineer

Principal/Senior-level MongoDB playbook for document modeling, indexing, replication, sharding, query design, observability, and production reliability. Use when: designing document schemas, reviewing aggregation/query performance, operating replicas/shards, or hardening MongoDB-backed systems.

4 0
Explore

Didn't find tool you were looking for?

Be as detailed as possible for better results