Agent skill
event-driven-architect
Designs event-driven architectures with event sourcing, CQRS, pub/sub patterns, and domain events for decoupled systems. Use when users request "event sourcing", "CQRS", "domain events", "pub/sub", or "event-driven".
Install this agent skill to your Project
npx add-skill https://github.com/patricio0312rev/skills/tree/main/backend/event-driven-architect
SKILL.md
Event-Driven Architect
Build decoupled, scalable systems with event-driven patterns.
Core Workflow
- Identify domain events: Define what happened
- Design event schema: Structure event payloads
- Implement event bus: Publish and subscribe
- Add event handlers: React to events
- Consider CQRS: Separate reads and writes
- Enable event sourcing: Store event history
Event Fundamentals
Event Structure
// events/base.ts
export interface DomainEvent<T = unknown> {
id: string;
type: string;
aggregateId: string;
aggregateType: string;
payload: T;
metadata: {
timestamp: Date;
version: number;
correlationId?: string;
causationId?: string;
userId?: string;
};
}
// Type-safe event creator
export function createEvent<T>(
type: string,
aggregateType: string,
aggregateId: string,
payload: T,
metadata?: Partial<DomainEvent['metadata']>
): DomainEvent<T> {
return {
id: crypto.randomUUID(),
type,
aggregateType,
aggregateId,
payload,
metadata: {
timestamp: new Date(),
version: 1,
...metadata,
},
};
}
Define Domain Events
// events/order.events.ts
export interface OrderCreatedPayload {
customerId: string;
items: Array<{
productId: string;
quantity: number;
price: number;
}>;
totalAmount: number;
shippingAddress: Address;
}
export interface OrderPaidPayload {
paymentId: string;
amount: number;
method: 'card' | 'bank' | 'wallet';
}
export interface OrderShippedPayload {
trackingNumber: string;
carrier: string;
estimatedDelivery: string;
}
export interface OrderCancelledPayload {
reason: string;
cancelledBy: string;
refundAmount?: number;
}
// Event types
export type OrderEvent =
| DomainEvent<OrderCreatedPayload> & { type: 'OrderCreated' }
| DomainEvent<OrderPaidPayload> & { type: 'OrderPaid' }
| DomainEvent<OrderShippedPayload> & { type: 'OrderShipped' }
| DomainEvent<OrderCancelledPayload> & { type: 'OrderCancelled' };
// Event creators
export const OrderEvents = {
created: (orderId: string, payload: OrderCreatedPayload) =>
createEvent('OrderCreated', 'Order', orderId, payload),
paid: (orderId: string, payload: OrderPaidPayload) =>
createEvent('OrderPaid', 'Order', orderId, payload),
shipped: (orderId: string, payload: OrderShippedPayload) =>
createEvent('OrderShipped', 'Order', orderId, payload),
cancelled: (orderId: string, payload: OrderCancelledPayload) =>
createEvent('OrderCancelled', 'Order', orderId, payload),
};
Event Bus
In-Memory Event Bus
// events/event-bus.ts
import { EventEmitter } from 'events';
import { DomainEvent } from './base';
type EventHandler<T = unknown> = (event: DomainEvent<T>) => Promise<void>;
class EventBus {
private emitter = new EventEmitter();
private handlers = new Map<string, EventHandler[]>();
async publish<T>(event: DomainEvent<T>): Promise<void> {
console.log(`Publishing event: ${event.type}`, event);
// Store event (for event sourcing)
await this.storeEvent(event);
// Emit to handlers
this.emitter.emit(event.type, event);
this.emitter.emit('*', event); // Wildcard for all events
}
async publishAll(events: DomainEvent[]): Promise<void> {
for (const event of events) {
await this.publish(event);
}
}
subscribe<T>(eventType: string, handler: EventHandler<T>): () => void {
const wrappedHandler = async (event: DomainEvent<T>) => {
try {
await handler(event);
} catch (error) {
console.error(`Error handling ${eventType}:`, error);
// Could emit to dead letter queue here
}
};
this.emitter.on(eventType, wrappedHandler);
// Return unsubscribe function
return () => {
this.emitter.off(eventType, wrappedHandler);
};
}
subscribeAll(handler: EventHandler): () => void {
return this.subscribe('*', handler);
}
private async storeEvent(event: DomainEvent): Promise<void> {
await db.event.create({
data: {
id: event.id,
type: event.type,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
payload: event.payload as any,
metadata: event.metadata as any,
createdAt: event.metadata.timestamp,
},
});
}
}
export const eventBus = new EventBus();
Redis-Based Event Bus
// events/redis-event-bus.ts
import { Redis } from 'ioredis';
import { DomainEvent } from './base';
const publisher = new Redis(process.env.REDIS_URL!);
const subscriber = new Redis(process.env.REDIS_URL!);
class RedisEventBus {
private handlers = new Map<string, Set<(event: DomainEvent) => Promise<void>>>();
constructor() {
subscriber.on('message', async (channel, message) => {
const event = JSON.parse(message) as DomainEvent;
const handlers = this.handlers.get(channel) || new Set();
for (const handler of handlers) {
try {
await handler(event);
} catch (error) {
console.error(`Error handling ${event.type}:`, error);
}
}
});
}
async publish(event: DomainEvent): Promise<void> {
const channel = `events:${event.type}`;
await publisher.publish(channel, JSON.stringify(event));
// Also store in stream for replay
await publisher.xadd(
`stream:${event.aggregateType}`,
'*',
'event',
JSON.stringify(event)
);
}
subscribe(eventType: string, handler: (event: DomainEvent) => Promise<void>): () => void {
const channel = `events:${eventType}`;
if (!this.handlers.has(channel)) {
this.handlers.set(channel, new Set());
subscriber.subscribe(channel);
}
this.handlers.get(channel)!.add(handler);
return () => {
this.handlers.get(channel)?.delete(handler);
};
}
}
export const eventBus = new RedisEventBus();
Event Handlers
Handler Registration
// handlers/order.handlers.ts
import { eventBus } from '../events/event-bus';
import { OrderEvent } from '../events/order.events';
// Email notification on order created
eventBus.subscribe<OrderCreatedPayload>('OrderCreated', async (event) => {
await emailService.send({
to: await getUserEmail(event.payload.customerId),
template: 'order-confirmation',
data: {
orderId: event.aggregateId,
items: event.payload.items,
total: event.payload.totalAmount,
},
});
});
// Update inventory on order created
eventBus.subscribe<OrderCreatedPayload>('OrderCreated', async (event) => {
for (const item of event.payload.items) {
await inventoryService.reserve(item.productId, item.quantity);
}
});
// Analytics tracking
eventBus.subscribe<OrderPaidPayload>('OrderPaid', async (event) => {
await analytics.track('order_completed', {
orderId: event.aggregateId,
amount: event.payload.amount,
paymentMethod: event.payload.method,
});
});
// Notify shipping on order paid
eventBus.subscribe<OrderPaidPayload>('OrderPaid', async (event) => {
await shippingService.createShipment(event.aggregateId);
});
// Handle cancellation
eventBus.subscribe<OrderCancelledPayload>('OrderCancelled', async (event) => {
// Release inventory
const order = await orderRepository.findById(event.aggregateId);
for (const item of order.items) {
await inventoryService.release(item.productId, item.quantity);
}
// Process refund
if (event.payload.refundAmount) {
await paymentService.refund(event.aggregateId, event.payload.refundAmount);
}
// Send cancellation email
await emailService.send({
to: await getUserEmail(order.customerId),
template: 'order-cancelled',
data: {
orderId: event.aggregateId,
reason: event.payload.reason,
},
});
});
Event Sourcing
Aggregate with Events
// aggregates/order.aggregate.ts
import { DomainEvent } from '../events/base';
import { OrderEvents, OrderCreatedPayload, OrderPaidPayload } from '../events/order.events';
interface OrderItem {
productId: string;
quantity: number;
price: number;
}
type OrderStatus = 'pending' | 'paid' | 'shipped' | 'delivered' | 'cancelled';
export class OrderAggregate {
private _id: string;
private _status: OrderStatus = 'pending';
private _items: OrderItem[] = [];
private _totalAmount: number = 0;
private _customerId: string = '';
private _version: number = 0;
private uncommittedEvents: DomainEvent[] = [];
get id() { return this._id; }
get status() { return this._status; }
get items() { return [...this._items]; }
get version() { return this._version; }
constructor(id?: string) {
this._id = id || crypto.randomUUID();
}
// Command: Create order
static create(customerId: string, items: OrderItem[], shippingAddress: Address): OrderAggregate {
const order = new OrderAggregate();
const totalAmount = items.reduce((sum, item) => sum + item.price * item.quantity, 0);
order.apply(
OrderEvents.created(order._id, {
customerId,
items,
totalAmount,
shippingAddress,
})
);
return order;
}
// Command: Pay order
pay(paymentId: string, amount: number, method: 'card' | 'bank' | 'wallet'): void {
if (this._status !== 'pending') {
throw new Error('Order cannot be paid in current status');
}
if (amount !== this._totalAmount) {
throw new Error('Payment amount does not match order total');
}
this.apply(
OrderEvents.paid(this._id, { paymentId, amount, method })
);
}
// Command: Cancel order
cancel(reason: string, cancelledBy: string): void {
if (['shipped', 'delivered', 'cancelled'].includes(this._status)) {
throw new Error('Order cannot be cancelled in current status');
}
const refundAmount = this._status === 'paid' ? this._totalAmount : undefined;
this.apply(
OrderEvents.cancelled(this._id, { reason, cancelledBy, refundAmount })
);
}
// Apply event and track for persistence
private apply(event: DomainEvent): void {
this.applyEvent(event);
this.uncommittedEvents.push(event);
}
// Apply event to state (used for replay too)
private applyEvent(event: DomainEvent): void {
switch (event.type) {
case 'OrderCreated':
const created = event.payload as OrderCreatedPayload;
this._customerId = created.customerId;
this._items = created.items;
this._totalAmount = created.totalAmount;
this._status = 'pending';
break;
case 'OrderPaid':
this._status = 'paid';
break;
case 'OrderShipped':
this._status = 'shipped';
break;
case 'OrderCancelled':
this._status = 'cancelled';
break;
}
this._version++;
}
// Get and clear uncommitted events
getUncommittedEvents(): DomainEvent[] {
const events = [...this.uncommittedEvents];
this.uncommittedEvents = [];
return events;
}
// Rebuild from events
static fromEvents(events: DomainEvent[]): OrderAggregate {
if (events.length === 0) {
throw new Error('Cannot rebuild aggregate from empty events');
}
const order = new OrderAggregate(events[0].aggregateId);
for (const event of events) {
order.applyEvent(event);
}
return order;
}
}
Event Store Repository
// repositories/event-store.repository.ts
import { db } from '../lib/db';
import { DomainEvent } from '../events/base';
import { eventBus } from '../events/event-bus';
export class EventStoreRepository<T extends { id: string; getUncommittedEvents(): DomainEvent[] }> {
constructor(
private aggregateType: string,
private reconstruct: (events: DomainEvent[]) => T
) {}
async save(aggregate: T): Promise<void> {
const events = aggregate.getUncommittedEvents();
if (events.length === 0) return;
// Store events
await db.event.createMany({
data: events.map((event) => ({
id: event.id,
type: event.type,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
payload: event.payload as any,
metadata: event.metadata as any,
createdAt: event.metadata.timestamp,
})),
});
// Publish events
await eventBus.publishAll(events);
}
async findById(id: string): Promise<T | null> {
const events = await db.event.findMany({
where: {
aggregateId: id,
aggregateType: this.aggregateType,
},
orderBy: { createdAt: 'asc' },
});
if (events.length === 0) return null;
return this.reconstruct(
events.map((e) => ({
id: e.id,
type: e.type,
aggregateId: e.aggregateId,
aggregateType: e.aggregateType,
payload: e.payload,
metadata: e.metadata as any,
}))
);
}
async getEvents(aggregateId: string, fromVersion?: number): Promise<DomainEvent[]> {
const events = await db.event.findMany({
where: {
aggregateId,
aggregateType: this.aggregateType,
...(fromVersion && {
metadata: { path: ['version'], gte: fromVersion },
}),
},
orderBy: { createdAt: 'asc' },
});
return events.map((e) => ({
id: e.id,
type: e.type,
aggregateId: e.aggregateId,
aggregateType: e.aggregateType,
payload: e.payload,
metadata: e.metadata as any,
}));
}
}
// Usage
export const orderRepository = new EventStoreRepository(
'Order',
OrderAggregate.fromEvents
);
CQRS Pattern
Separate Command and Query
// commands/create-order.command.ts
export interface CreateOrderCommand {
customerId: string;
items: Array<{ productId: string; quantity: number }>;
shippingAddress: Address;
}
// Command handler
export async function handleCreateOrder(command: CreateOrderCommand): Promise<string> {
// Validate
const customer = await customerRepository.findById(command.customerId);
if (!customer) throw new Error('Customer not found');
// Get product prices
const items = await Promise.all(
command.items.map(async (item) => {
const product = await productRepository.findById(item.productId);
return {
productId: item.productId,
quantity: item.quantity,
price: product.price,
};
})
);
// Create aggregate and save
const order = OrderAggregate.create(
command.customerId,
items,
command.shippingAddress
);
await orderRepository.save(order);
return order.id;
}
// queries/order.queries.ts
// Read model - denormalized for fast queries
export interface OrderReadModel {
id: string;
status: string;
customerName: string;
customerEmail: string;
items: Array<{
productName: string;
quantity: number;
price: number;
}>;
totalAmount: number;
createdAt: Date;
paidAt?: Date;
shippedAt?: Date;
}
// Query handler
export async function getOrderById(orderId: string): Promise<OrderReadModel | null> {
return db.orderReadModel.findUnique({
where: { id: orderId },
});
}
export async function getOrdersByCustomer(customerId: string): Promise<OrderReadModel[]> {
return db.orderReadModel.findMany({
where: { customerId },
orderBy: { createdAt: 'desc' },
});
}
Read Model Projector
// projectors/order.projector.ts
import { eventBus } from '../events/event-bus';
// Project events to read model
eventBus.subscribe('OrderCreated', async (event) => {
const { customerId, items, totalAmount } = event.payload;
const customer = await db.customer.findUnique({ where: { id: customerId } });
await db.orderReadModel.create({
data: {
id: event.aggregateId,
status: 'pending',
customerId,
customerName: customer.name,
customerEmail: customer.email,
items: await enrichItems(items),
totalAmount,
createdAt: event.metadata.timestamp,
},
});
});
eventBus.subscribe('OrderPaid', async (event) => {
await db.orderReadModel.update({
where: { id: event.aggregateId },
data: {
status: 'paid',
paidAt: event.metadata.timestamp,
},
});
});
eventBus.subscribe('OrderShipped', async (event) => {
await db.orderReadModel.update({
where: { id: event.aggregateId },
data: {
status: 'shipped',
shippedAt: event.metadata.timestamp,
trackingNumber: event.payload.trackingNumber,
},
});
});
Best Practices
- Immutable events: Never modify stored events
- Descriptive event names: Past tense (OrderCreated, not CreateOrder)
- Include all context: Events should be self-contained
- Version events: Handle schema evolution
- Idempotent handlers: Handle duplicate events gracefully
- Separate concerns: Commands mutate, queries read
- Event versioning: Support backward compatibility
- Dead letter queue: Handle failed events
Output Checklist
Every event-driven system should include:
- Well-defined domain events
- Type-safe event payloads
- Event bus (in-memory or distributed)
- Event handlers with error handling
- Event store for persistence
- Aggregate with event sourcing (if needed)
- CQRS separation (if needed)
- Read model projectors
- Dead letter handling
- Event replay capability
Recommended Agent Skills
Expand your agent's capabilities with these related and highly-rated skills.
rate-limiting-abuse-protection
Implements rate limiting and abuse prevention with per-route policies, IP/user-based limits, sliding windows, safe error responses, and observability. Use when adding "rate limiting", "API protection", "abuse prevention", or "DDoS protection".
rbac-permissions-builder
Implements role-based access control with permission matrix, route guards, policy functions, and UI permission hints. Provides middleware/guards, helper utilities, test suggestions, and permission checking patterns. Use when building "RBAC", "permissions", "access control", or "authorization".
websocket-realtime-builder
Implements real-time features using WebSockets with Socket.io, rooms, authentication, and reconnection handling. Use when users request "real-time updates", "WebSocket", "Socket.io", "live chat", or "push notifications".
webhook-receiver-hardener
Secures webhook receivers with signature verification, retry handling, deduplication, idempotency keys, and error responses. Provides verification code, dedupe storage strategy, runbook for incidents. Use when implementing "webhooks", "webhook security", "event receivers", or "third-party integrations".
auth-module-builder
Implements secure authentication patterns including login/registration, session management, JWT tokens, password hashing, cookie settings, and CSRF protection. Provides auth routes, middleware, security configurations, and threat model documentation. Use when building "authentication", "login system", "JWT auth", or "session management".
rest-to-graphql-migrator
Migrates REST APIs to GraphQL incrementally with schema stitching, REST datasources, and gradual endpoint migration. Use when users request "migrate to GraphQL", "REST to GraphQL", "GraphQL wrapper", or "API modernization".
Didn't find tool you were looking for?