Agent skill

Messaging & Event Systems

Messaging and event-driven architecture. Activate when: (1) Working with NATS pub/sub, (2) Configuring Temporal workflows, (3) Implementing event sourcing, (4) Setting up message queues, or (5) Designing async communication patterns.

Stars 163
Forks 31

Install this agent skill to your Project

npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/data/messaging

SKILL.md

Messaging & Event Systems

Overview

This skill covers messaging systems (NATS), workflow orchestration (Temporal), and event-driven architecture patterns.

NATS

Core Concepts

Concept Description
Subject Message address/topic (e.g., orders.created)
Publisher Sends messages to subjects
Subscriber Receives messages from subjects
Queue Group Load-balanced message distribution
JetStream Persistence and streaming layer
KV Store Key-value storage built on JetStream

NATS CLI

bash
# Connect
nats context add local --server nats://localhost:4222
nats context select local

# Pub/Sub
nats pub orders.created '{"id": 123}'
nats sub 'orders.>'                    # Wildcard subscription

# Request/Reply
nats reply 'service.ping' 'pong'       # In terminal 1
nats request 'service.ping' ''         # In terminal 2

# JetStream
nats stream add ORDERS --subjects "orders.>" --retention limits
nats stream info ORDERS
nats consumer add ORDERS processor --pull --ack explicit

# KV Store
nats kv add CONFIG
nats kv put CONFIG app.setting "value"
nats kv get CONFIG app.setting
nats kv watch CONFIG

NATS Server Configuration

hcl
# nats.conf
port: 4222

jetstream {
  store_dir: /data/jetstream
  max_mem: 1G
  max_file: 10G
}

cluster {
  name: my-cluster
  port: 6222
  routes: [
    nats-route://nats-1:6222
    nats-route://nats-2:6222
  ]
}

NATS Client (Rust)

rust
use async_nats;

#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
    let client = async_nats::connect("nats://localhost:4222").await?;

    // Publish
    client.publish("events.user.created", "user data".into()).await?;

    // Subscribe
    let mut subscriber = client.subscribe("events.>").await?;
    while let Some(message) = subscriber.next().await {
        println!("Received: {:?}", message);
    }

    Ok(())
}

NATS Client (Python)

python
import asyncio
import nats

async def main():
    nc = await nats.connect("nats://localhost:4222")

    # Subscribe
    async def message_handler(msg):
        print(f"Received: {msg.subject}: {msg.data.decode()}")

    await nc.subscribe("events.>", cb=message_handler)

    # Publish
    await nc.publish("events.user.created", b'{"user_id": 123}')

    # Request/Reply
    response = await nc.request("service.ping", b'', timeout=1)
    print(f"Response: {response.data.decode()}")

asyncio.run(main())

Temporal

Core Concepts

Concept Description
Workflow Durable, long-running business process
Activity A single unit of work (can fail/retry)
Worker Executes workflows and activities
Task Queue Routes work to workers
Signal External event sent to running workflow
Query Read workflow state without affecting it

Workflow Definition (Python)

python
from temporalio import workflow, activity
from datetime import timedelta

@activity.defn
async def send_email(to: str, subject: str) -> str:
    # Actual email sending logic
    return f"Email sent to {to}"

@activity.defn
async def process_payment(order_id: str, amount: float) -> bool:
    # Payment processing logic
    return True

@workflow.defn
class OrderWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> str:
        # Activities with retry policy
        payment_result = await workflow.execute_activity(
            process_payment,
            args=[order_id, 99.99],
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=RetryPolicy(maximum_attempts=3)
        )

        if payment_result:
            await workflow.execute_activity(
                send_email,
                args=["customer@example.com", "Order Confirmed"],
                start_to_close_timeout=timedelta(seconds=10)
            )

        return f"Order {order_id} completed"

Worker Setup

python
from temporalio.client import Client
from temporalio.worker import Worker

async def main():
    client = await Client.connect("localhost:7233")

    worker = Worker(
        client,
        task_queue="order-queue",
        workflows=[OrderWorkflow],
        activities=[send_email, process_payment]
    )

    await worker.run()

asyncio.run(main())

Start Workflow

python
async def start_order():
    client = await Client.connect("localhost:7233")

    result = await client.execute_workflow(
        OrderWorkflow.run,
        "order-123",
        id="order-workflow-123",
        task_queue="order-queue"
    )

    print(f"Result: {result}")

Temporal CLI

bash
# Start workflow
temporal workflow start \
  --task-queue order-queue \
  --type OrderWorkflow \
  --input '"order-123"'

# List workflows
temporal workflow list

# Describe workflow
temporal workflow describe --workflow-id order-workflow-123

# Signal workflow
temporal workflow signal \
  --workflow-id order-workflow-123 \
  --name cancel \
  --input '"reason"'

# Query workflow
temporal workflow query \
  --workflow-id order-workflow-123 \
  --name status

Event-Driven Patterns

Event Sourcing

python
from dataclasses import dataclass
from typing import List
from datetime import datetime

@dataclass
class Event:
    id: str
    timestamp: datetime
    type: str
    data: dict

class OrderAggregate:
    def __init__(self, order_id: str):
        self.id = order_id
        self.status = "pending"
        self.items = []
        self.events: List[Event] = []

    def apply(self, event: Event):
        if event.type == "OrderCreated":
            self.status = "created"
            self.items = event.data["items"]
        elif event.type == "OrderPaid":
            self.status = "paid"
        elif event.type == "OrderShipped":
            self.status = "shipped"

    def add_item(self, item: dict):
        event = Event(
            id=str(uuid4()),
            timestamp=datetime.utcnow(),
            type="ItemAdded",
            data={"item": item}
        )
        self.events.append(event)
        self.apply(event)

CQRS Pattern

python
# Command Handler
class CreateOrderCommand:
    def __init__(self, customer_id: str, items: list):
        self.customer_id = customer_id
        self.items = items

async def handle_create_order(cmd: CreateOrderCommand):
    order = Order.create(cmd.customer_id, cmd.items)
    await event_store.append(order.id, order.events)
    await nats.publish("orders.created", order.to_json())

# Query Handler (separate read model)
async def get_order_summary(order_id: str) -> dict:
    return await read_db.query(
        "SELECT * FROM order_summaries WHERE id = $1",
        order_id
    )

Saga Pattern

python
@workflow.defn
class OrderSaga:
    @workflow.run
    async def run(self, order: dict) -> str:
        try:
            # Step 1: Reserve inventory
            reservation = await workflow.execute_activity(
                reserve_inventory, args=[order["items"]]
            )

            # Step 2: Process payment
            payment = await workflow.execute_activity(
                process_payment, args=[order["total"]]
            )

            # Step 3: Ship order
            shipping = await workflow.execute_activity(
                create_shipment, args=[order["address"]]
            )

            return "Order completed"

        except Exception as e:
            # Compensating transactions
            if reservation:
                await workflow.execute_activity(release_inventory)
            if payment:
                await workflow.execute_activity(refund_payment)
            raise

External Links

Expand your agent's capabilities with these related and highly-rated skills.

Didn't find tool you were looking for?

Be as detailed as possible for better results