Agent skill
add-outbox-pattern
Add transactional outbox pattern for reliable event publishing with RavenDB (project)
Install this agent skill to your Project
npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/data/add-outbox-pattern
SKILL.md
Add Outbox Pattern Skill
Implement the transactional outbox pattern for reliable event publishing in NovaTune using RavenDB.
Overview
The outbox pattern ensures exactly-once event publishing by:
- Writing events to an
OutboxMessagescollection in the same transaction as domain changes - A background processor reads and publishes events, then marks them as processed
- Guarantees no lost events even if Kafka/Redpanda is temporarily unavailable
Steps
1. Create OutboxMessage Model
Location: src/NovaTuneApp/NovaTuneApp.ApiService/Models/OutboxMessage.cs
namespace NovaTuneApp.ApiService.Models;
/// <summary>
/// Represents an event pending publication to the message broker.
/// </summary>
public sealed class OutboxMessage
{
/// <summary>
/// RavenDB document ID (e.g., "OutboxMessages/01HXK...")
/// </summary>
public string Id { get; init; } = string.Empty;
/// <summary>
/// Event type name for deserialization/routing.
/// </summary>
public required string EventType { get; init; }
/// <summary>
/// JSON-serialized event payload.
/// </summary>
public required string Payload { get; init; }
/// <summary>
/// Kafka partition key for ordering guarantees.
/// </summary>
public required string PartitionKey { get; init; }
/// <summary>
/// Target topic name (without prefix).
/// </summary>
public string? Topic { get; init; }
/// <summary>
/// When the outbox message was created.
/// </summary>
public required DateTimeOffset CreatedAt { get; init; }
/// <summary>
/// When the message was published (null if pending).
/// </summary>
public DateTimeOffset? ProcessedAt { get; set; }
/// <summary>
/// Number of publication attempts.
/// </summary>
public int Attempts { get; set; }
/// <summary>
/// Last error message if publication failed.
/// </summary>
public string? LastError { get; set; }
}
2. Create RavenDB Index for Pending Messages
Location: src/NovaTuneApp/NovaTuneApp.ApiService/Infrastructure/Indexes/OutboxMessages_ByPending.cs
using Raven.Client.Documents.Indexes;
using NovaTuneApp.ApiService.Models;
namespace NovaTuneApp.ApiService.Infrastructure.Indexes;
public class OutboxMessages_ByPending : AbstractIndexCreationTask<OutboxMessage>
{
public OutboxMessages_ByPending()
{
Map = messages => from msg in messages
where msg.ProcessedAt == null
select new
{
msg.CreatedAt,
msg.Attempts
};
}
}
3. Create Outbox Service Interface
Location: src/NovaTuneApp/NovaTuneApp.ApiService/Services/IOutboxService.cs
namespace NovaTuneApp.ApiService.Services;
/// <summary>
/// Service for writing events to the outbox.
/// </summary>
public interface IOutboxService
{
/// <summary>
/// Writes an event to the outbox within the current session.
/// Must be called before SaveChangesAsync().
/// </summary>
Task WriteAsync<TEvent>(
TEvent @event,
string partitionKey,
string? topic = null,
CancellationToken ct = default) where TEvent : class;
}
4. Implement Outbox Service
Location: src/NovaTuneApp/NovaTuneApp.ApiService/Services/OutboxService.cs
using System.Text.Json;
using Raven.Client.Documents.Session;
using NovaTuneApp.ApiService.Models;
namespace NovaTuneApp.ApiService.Services;
public class OutboxService : IOutboxService
{
private readonly IAsyncDocumentSession _session;
private readonly ILogger<OutboxService> _logger;
public OutboxService(
IAsyncDocumentSession session,
ILogger<OutboxService> logger)
{
_session = session;
_logger = logger;
}
public async Task WriteAsync<TEvent>(
TEvent @event,
string partitionKey,
string? topic = null,
CancellationToken ct = default) where TEvent : class
{
var eventType = typeof(TEvent).Name;
var outboxMessage = new OutboxMessage
{
Id = $"OutboxMessages/{Ulid.NewUlid()}",
EventType = eventType,
Payload = JsonSerializer.Serialize(@event),
PartitionKey = partitionKey,
Topic = topic,
CreatedAt = DateTimeOffset.UtcNow
};
await _session.StoreAsync(outboxMessage, ct);
_logger.LogDebug(
"Queued {EventType} for partition {PartitionKey}",
eventType, partitionKey);
}
}
5. Create Outbox Processor Background Service
Location: src/NovaTuneApp/NovaTuneApp.ApiService/Infrastructure/Services/OutboxProcessorService.cs
using System.Text.Json;
using KafkaFlow.Producers;
using Microsoft.Extensions.Options;
using Raven.Client.Documents;
using Raven.Client.Documents.Session;
using NovaTuneApp.ApiService.Configuration;
using NovaTuneApp.ApiService.Models;
using NovaTuneApp.ApiService.Infrastructure.Indexes;
namespace NovaTuneApp.ApiService.Infrastructure.Services;
public class OutboxProcessorService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly IOptions<OutboxOptions> _options;
private readonly IOptions<NovaTuneOptions> _novatuneOptions;
private readonly ILogger<OutboxProcessorService> _logger;
public OutboxProcessorService(
IServiceProvider serviceProvider,
IOptions<OutboxOptions> options,
IOptions<NovaTuneOptions> novatuneOptions,
ILogger<OutboxProcessorService> logger)
{
_serviceProvider = serviceProvider;
_options = options;
_novatuneOptions = novatuneOptions;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!_options.Value.Enabled)
{
_logger.LogInformation("Outbox processor is disabled");
return;
}
_logger.LogInformation(
"Outbox processor starting with {Interval} interval",
_options.Value.PollingInterval);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await ProcessBatchAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing outbox");
}
await Task.Delay(_options.Value.PollingInterval, stoppingToken);
}
}
private async Task ProcessBatchAsync(CancellationToken ct)
{
using var scope = _serviceProvider.CreateScope();
var store = scope.ServiceProvider.GetRequiredService<IDocumentStore>();
var producerAccessor = scope.ServiceProvider.GetRequiredService<IProducerAccessor>();
using var session = store.OpenAsyncSession();
var pendingMessages = await session
.Query<OutboxMessage, OutboxMessages_ByPending>()
.Where(m => m.ProcessedAt == null && m.Attempts < _options.Value.MaxAttempts)
.OrderBy(m => m.CreatedAt)
.Take(_options.Value.BatchSize)
.ToListAsync(ct);
if (pendingMessages.Count == 0) return;
_logger.LogDebug("Processing {Count} outbox messages", pendingMessages.Count);
var topicPrefix = _novatuneOptions.Value.TopicPrefix;
foreach (var message in pendingMessages)
{
try
{
var topic = message.Topic ?? GetDefaultTopic(message.EventType);
var fullTopic = $"{topicPrefix}-{topic}";
var producer = producerAccessor.GetProducer("default");
await producer.ProduceAsync(
fullTopic,
message.PartitionKey,
message.Payload);
message.ProcessedAt = DateTimeOffset.UtcNow;
_logger.LogDebug(
"Published {EventType} to {Topic}",
message.EventType, fullTopic);
}
catch (Exception ex)
{
message.Attempts++;
message.LastError = ex.Message;
_logger.LogWarning(
ex,
"Failed to publish {EventType} (attempt {Attempt})",
message.EventType, message.Attempts);
}
}
await session.SaveChangesAsync(ct);
}
private static string GetDefaultTopic(string eventType) => eventType switch
{
nameof(TrackDeletedEvent) => "track-deletions",
nameof(AudioUploadedEvent) => "audio-events",
_ => "events"
};
}
6. Add Configuration Options
Location: src/NovaTuneApp/NovaTuneApp.ApiService/Configuration/OutboxOptions.cs
namespace NovaTuneApp.ApiService.Configuration;
public class OutboxOptions
{
public const string SectionName = "Outbox";
/// <summary>
/// Polling interval for outbox processor.
/// Default: 1 second.
/// </summary>
public TimeSpan PollingInterval { get; set; } = TimeSpan.FromSeconds(1);
/// <summary>
/// Maximum messages per batch.
/// Default: 100.
/// </summary>
public int BatchSize { get; set; } = 100;
/// <summary>
/// Maximum publication attempts before giving up.
/// Default: 5.
/// </summary>
public int MaxAttempts { get; set; } = 5;
/// <summary>
/// Whether outbox processing is enabled.
/// Default: true.
/// </summary>
public bool Enabled { get; set; } = true;
/// <summary>
/// Retention period for processed messages.
/// Default: 7 days.
/// </summary>
public TimeSpan RetentionPeriod { get; set; } = TimeSpan.FromDays(7);
}
7. Register Services in Program.cs
// Configuration
builder.Services.Configure<OutboxOptions>(
builder.Configuration.GetSection(OutboxOptions.SectionName));
// Services
builder.Services.AddScoped<IOutboxService, OutboxService>();
// Background processor
builder.Services.AddHostedService<OutboxProcessorService>();
8. Add Configuration to appsettings.json
{
"Outbox": {
"PollingInterval": "00:00:01",
"BatchSize": 100,
"MaxAttempts": 5,
"Enabled": true,
"RetentionPeriod": "7.00:00:00"
}
}
Usage Example
public class TrackManagementService : ITrackManagementService
{
private readonly IAsyncDocumentSession _session;
private readonly IOutboxService _outboxService;
public async Task DeleteTrackAsync(string trackId, string userId, CancellationToken ct)
{
var track = await _session.LoadAsync<Track>($"Tracks/{trackId}", ct);
// ... validation ...
// Soft-delete track
track.Status = TrackStatus.Deleted;
track.DeletedAt = DateTimeOffset.UtcNow;
track.ScheduledDeletionAt = track.DeletedAt.Value.AddDays(30);
// Write event to outbox (same transaction)
var evt = new TrackDeletedEvent
{
TrackId = trackId,
UserId = userId,
ObjectKey = track.ObjectKey,
// ... other fields
};
await _outboxService.WriteAsync(evt, partitionKey: trackId, ct: ct);
// Both track update and outbox message saved atomically
await _session.SaveChangesAsync(ct);
}
}
Benefits
- Exactly-once delivery: Events stored atomically with domain changes
- Resilience: Events published even if broker temporarily unavailable
- Ordering: Partition key ensures order within entity
- Retries: Failed messages retried with exponential backoff
- Observability: Failed messages visible in RavenDB
Cleanup
Add a scheduled task to delete processed messages older than retention period:
// In OutboxProcessorService or separate cleanup service
var cutoff = DateTimeOffset.UtcNow - _options.Value.RetentionPeriod;
var oldMessages = await session
.Query<OutboxMessage>()
.Where(m => m.ProcessedAt != null && m.ProcessedAt < cutoff)
.Take(1000)
.ToListAsync(ct);
foreach (var msg in oldMessages)
session.Delete(msg);
await session.SaveChangesAsync(ct);
Didn't find tool you were looking for?