Agent skill
rabbitmq-master
Ultimate RabbitMQ expertise skill for production-grade message broker architecture, implementation, and operations. Top 0.01% knowledge covering: (1) Advanced messaging patterns - Dead Letter Exchanges, Delayed Messages, Priority Queues, Consistent Hash Exchange, Sharding, (2) High Availability - Clustering, Quorum Queues, Stream Queues, Federation, Shovel, (3) Performance Engineering - prefetch tuning, connection pooling, batch publishing, memory optimization, flow control, (4) Security - TLS/mTLS, OAuth2, LDAP, certificate rotation, (5) Monitoring - Prometheus metrics, custom health checks, anomaly detection, (6) Troubleshooting - memory alarms, network partitions, queue backlogs, consumer starvation, (7) Multi-tenancy - vhost design, resource limits, isolation patterns, (8) Event-driven architectures - CQRS, Event Sourcing, Saga patterns with RabbitMQ. Use when: building messaging systems, debugging RabbitMQ issues, optimizing performance, designing HA architectures, implementing advanced patterns, production hardening, capacity planning, migration strategies.
Install this agent skill to your Project
npx add-skill https://github.com/mOdrA40/claude-codex-skills-directory/tree/main/data-skills/rabbitmq-mastery-skill
SKILL.md
RabbitMQ Master Skill
Expert-level RabbitMQ knowledge for building bulletproof messaging systems.
Quick Reference
Connection Best Practices
# WRONG - Connection per message (kills performance)
def send_bad(msg):
conn = pika.BlockingConnection(params) # 7-way TCP handshake + AMQP handshake
ch = conn.channel()
ch.basic_publish(...)
conn.close()
# CORRECT - Connection pooling with heartbeat
import pika
from pika import ConnectionParameters, PlainCredentials
params = ConnectionParameters(
host='rabbitmq.prod',
port=5672,
credentials=PlainCredentials('user', 'pass'),
heartbeat=60, # Detect dead connections
blocked_connection_timeout=300, # Handle flow control
connection_attempts=3,
retry_delay=5,
socket_timeout=10,
stack_timeout=15,
# CRITICAL: TCP keepalive untuk cloud/NAT environments
tcp_options={'TCP_KEEPIDLE': 60, 'TCP_KEEPINTVL': 10, 'TCP_KEEPCNT': 3}
)
# Use connection pool - see scripts/connection_pool.py
Channel Best Practices
# Channels are NOT thread-safe - use 1 channel per thread
# Channels are cheap - create many, but not per message
# OPTIMAL: Dedicated channels per purpose
publish_channel = conn.channel()
publish_channel.confirm_delivery() # Enable publisher confirms
consume_channel = conn.channel()
consume_channel.basic_qos(prefetch_count=50) # Tuned prefetch
Core Patterns
1. Reliable Publishing (Publisher Confirms)
# Synchronous confirms (simple, slower)
channel.confirm_delivery()
try:
channel.basic_publish(
exchange='orders',
routing_key='new',
body=json.dumps(order),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
content_type='application/json',
message_id=str(uuid4()), # Idempotency key
timestamp=int(time.time()),
headers={'retry_count': 0}
),
mandatory=True # Return if unroutable
)
except pika.exceptions.UnroutableError:
handle_unroutable()
except pika.exceptions.NackError:
handle_nack()
# Asynchronous confirms (complex, 10x faster) - see scripts/async_publisher.py
2. Reliable Consuming
def callback(ch, method, properties, body):
try:
# ALWAYS process idempotently using message_id
if is_duplicate(properties.message_id):
ch.basic_ack(method.delivery_tag)
return
process_message(body)
mark_processed(properties.message_id)
ch.basic_ack(method.delivery_tag)
except RecoverableError as e:
# Requeue with exponential backoff via DLX
retry_count = (properties.headers or {}).get('retry_count', 0)
if retry_count < MAX_RETRIES:
republish_with_delay(ch, body, retry_count + 1)
ch.basic_ack(method.delivery_tag) # Ack original
else:
ch.basic_nack(method.delivery_tag, requeue=False) # To DLQ
except FatalError:
# Permanent failure - dead letter immediately
ch.basic_nack(method.delivery_tag, requeue=False)
channel.basic_qos(prefetch_count=50) # CRITICAL - tune this!
channel.basic_consume(queue='orders', on_message_callback=callback)
3. Dead Letter Exchange Pattern
# DLX captures: rejected, expired, queue-full messages
channel.exchange_declare('dlx.exchange', 'direct', durable=True)
channel.queue_declare('dlq.orders', durable=True)
channel.queue_bind('dlq.orders', 'dlx.exchange', 'orders')
# Main queue with DLX
channel.queue_declare(
'orders',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx.exchange',
'x-dead-letter-routing-key': 'orders',
'x-message-ttl': 86400000, # 24h max age
'x-max-length': 1000000, # Max 1M messages
'x-overflow': 'reject-publish-dlx' # DLX on overflow
}
)
4. Delayed/Scheduled Messages
# Method 1: Plugin (rabbitmq_delayed_message_exchange)
channel.exchange_declare(
'delayed.exchange',
'x-delayed-message',
arguments={'x-delayed-type': 'direct'}
)
channel.basic_publish(
exchange='delayed.exchange',
routing_key='scheduled',
body=payload,
properties=pika.BasicProperties(
headers={'x-delay': 60000} # 60 seconds delay
)
)
# Method 2: TTL + DLX chain (no plugin needed) - see references/patterns.md
5. Priority Queues
# CAUTION: Priority queues have overhead, use sparingly
channel.queue_declare(
'priority.orders',
durable=True,
arguments={
'x-max-priority': 10, # 1-10 priorities, keep low!
'x-queue-type': 'classic' # Not supported on quorum
}
)
# Publishing with priority
channel.basic_publish(
exchange='',
routing_key='priority.orders',
body=payload,
properties=pika.BasicProperties(
delivery_mode=2,
priority=8 # Higher = more important
)
)
High Availability
Quorum Queues (Recommended for HA)
# Raft-based replication - ALWAYS use for critical queues
channel.queue_declare(
'orders.quorum',
durable=True,
arguments={
'x-queue-type': 'quorum',
'x-quorum-initial-group-size': 3, # Replicas
'x-delivery-limit': 5, # Auto-DLQ after 5 redeliveries
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-strategy': 'at-least-once' # Safe DLQ
}
)
Stream Queues (High-throughput, replay)
# Kafka-like streams in RabbitMQ 3.9+
channel.queue_declare(
'events.stream',
durable=True,
arguments={
'x-queue-type': 'stream',
'x-max-length-bytes': 20_000_000_000, # 20GB retention
'x-max-age': '7D', # 7 days retention
'x-stream-max-segment-size-bytes': 500_000_000
}
)
# Consuming from offset
channel.basic_qos(prefetch_count=100)
channel.basic_consume(
'events.stream',
callback,
arguments={
'x-stream-offset': 'first' # first|last|next|timestamp|offset
}
)
Performance Tuning
Prefetch Optimization Formula
optimal_prefetch = (avg_processing_time_ms / avg_network_rtt_ms) * consumer_count * 1.5
Examples:
- Same datacenter (1ms RTT), 50ms processing, 1 consumer: (50/1) * 1 * 1.5 = 75
- Cross-region (50ms RTT), 50ms processing, 1 consumer: (50/50) * 1 * 1.5 = 2
- Batch processing (500ms), local: (500/1) * 1 * 1.5 = 750
Batch Publishing (10x throughput)
# Single publish: ~2000 msg/s
# Batch publish: ~20000+ msg/s
def batch_publish(channel, messages, batch_size=100):
channel.confirm_delivery()
for i in range(0, len(messages), batch_size):
batch = messages[i:i+batch_size]
for msg in batch:
channel.basic_publish(
exchange='batch.exchange',
routing_key=msg['key'],
body=msg['body'],
properties=pika.BasicProperties(delivery_mode=2)
)
# Confirm entire batch
channel.wait_for_confirms(timeout=30)
Memory Management
%% rabbitmq.conf - Production settings
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark_paging_ratio = 0.75
disk_free_limit.absolute = 5GB
%% Queue memory limits
queue_index_embed_msgs_below = 4096
lazy_queue_explicit_gc_run_operation_threshold = 1000
%% Flow control tuning
credit_flow_default_credit = {400, 200}
Monitoring & Alerting
Critical Metrics
# Prometheus alerts - see references/monitoring.md for full config
- alert: RabbitMQHighMemory
expr: rabbitmq_process_resident_memory_bytes / rabbitmq_resident_memory_limit_bytes > 0.8
- alert: RabbitMQQueueBacklog
expr: rabbitmq_queue_messages_ready > 100000
- alert: RabbitMQConsumerUtilization
expr: rabbitmq_queue_consumer_utilisation < 0.5 # Consumers idle = problem
- alert: RabbitMQUnackedMessages
expr: rabbitmq_queue_messages_unacknowledged > 10000
- alert: RabbitMQDiskAlarm
expr: rabbitmq_alarms_free_disk_space_watermark == 1
Health Check Script
# See scripts/health_check.sh for complete implementation
rabbitmqctl node_health_check
rabbitmqctl cluster_status
rabbitmq-diagnostics check_port_connectivity
rabbitmq-diagnostics check_running
rabbitmq-diagnostics check_local_alarms
Anti-Patterns to Avoid
| Anti-Pattern | Problem | Solution |
|---|---|---|
| Connection per message | 1000x overhead | Connection pool |
| No prefetch (unlimited) | Memory explosion | Tune prefetch_count |
auto_ack=True |
Message loss | Manual ack after processing |
| Classic queues for HA | Split-brain risk | Use Quorum queues |
| Polling with basic_get | CPU waste, latency | Use basic_consume |
| Giant messages (>128KB) | Memory pressure | External storage + reference |
| No message TTL | Queue bloat | Set x-message-ttl |
| Unbounded queue growth | Disk/memory full | x-max-length + overflow policy |
File Reference
scripts/connection_pool.py- Production-grade connection poolingscripts/async_publisher.py- High-throughput async publisher with confirmsscripts/consumer_template.py- Robust consumer with retry logicscripts/health_check.sh- Comprehensive health check scriptscripts/queue_migrate.py- Zero-downtime queue migration toolscripts/dlq_processor.py- Dead letter queue reprocessingreferences/patterns.md- Advanced messaging patterns deep-divereferences/clustering.md- HA clustering configurationreferences/security.md- Security hardening guidereferences/monitoring.md- Full monitoring setupreferences/troubleshooting.md- Problem diagnosis guidereferences/performance.md- Performance tuning deep-diveassets/rabbitmq.conf- Production configuration templateassets/docker-compose.yml- Development cluster setupassets/k8s/- Kubernetes deployment manifests
Recommended Agent Skills
Expand your agent's capabilities with these related and highly-rated skills.
nuxt-tanstack-mastery
Panduan senior/lead developer 20 tahun pengalaman untuk Vue.js 3 + Nuxt 3 + TanStack Query development. Gunakan skill ini ketika: (1) Membuat project Nuxt 3 baru dengan arsitektur production-ready, (2) Integrasi TanStack Query untuk data fetching, (3) Debugging Vue/Nuxt yang kompleks, (4) Review code untuk clean code compliance, (5) Optimisasi performa aplikasi Vue/Nuxt, (6) Setup folder structure yang scalable, (7) Mencari library terpercaya untuk Vue ecosystem, (8) Menghindari common pitfalls dan bugs, (9) Implementasi state management patterns, (10) Security hardening aplikasi Nuxt. Trigger keywords: vue, vuejs, nuxt, nuxtjs, tanstack, vue-query, composition api, pinia, vueuse, vue router, clean code vue, debugging vue, folder structure nuxt.
solidjs-solidstart-expert
Expert-level SolidJS and SolidStart development skill with 20+ years senior/lead engineer mindset. Comprehensive guidance for building production-ready, scalable web applications with fine-grained reactivity. Use when Claude needs to: (1) Create new SolidJS/SolidStart projects, (2) Implement TanStack Query/Router/Table/Form integration, (3) Build reactive components with signals/stores/resources, (4) Handle SSR/SSG/streaming with SolidStart, (5) Implement authentication and API routes, (6) Optimize bundle size and performance, (7) Debug reactivity issues and memory leaks, (8) Structure large-scale applications, (9) Implement type-safe patterns with TypeScript, (10) Handle error boundaries and suspense, (11) Build accessible UI components, (12) Deploy to Vercel/Netlify/Cloudflare. Triggers: "solid", "solidjs", "solidstart", "createSignal", "createStore", "createResource", "tanstack solid", "vinxi", "fine-grained reactivity".
react-tanstack-senior
Expertise senior/lead React developer 20 tahun dengan TanStack ecosystem (Query, Router, Table, Form, Start). Gunakan skill ini ketika: (1) Membuat aplikasi React dengan TanStack libraries, (2) Review/refactor kode React untuk clean code, (3) Debugging React/TanStack issues, (4) Setup project structure yang maintainable, (5) Optimasi performa React apps, (6) Memilih library yang tepat untuk use case tertentu, (7) Mencegah common bugs dan memory leaks, (8) Implementasi best practices KISS dan less is more. Trigger keywords: React, TanStack, React Query, TanStack Router, TanStack Table, TanStack Form, TanStack Start, Vinxi, clean code, refactor, performance, debugging.
clickhouse-principal-engineer
Principal/Senior-level ClickHouse playbook for analytical schema design, partitioning, ingestion, query performance, replication, storage strategy, and operating large-scale columnar systems. Use when: designing OLAP workloads, reviewing MergeTree layout, tuning analytical queries, building event analytics platforms, or operating ClickHouse in production.
mysql-principal-engineer
Principal/Senior-level MySQL playbook for schema design, indexing, transactions, replication, operational reliability, online migrations, and production workload tuning. Use when: designing relational systems, reviewing query/index strategy, operating MySQL fleets, debugging contention or replication lag, or hardening MySQL-backed applications.
mongodb-principal-engineer
Principal/Senior-level MongoDB playbook for document modeling, indexing, replication, sharding, query design, observability, and production reliability. Use when: designing document schemas, reviewing aggregation/query performance, operating replicas/shards, or hardening MongoDB-backed systems.
Didn't find tool you were looking for?