This article is part of the Comprehensive Guide to Microservices Architecture in .NET Core, Cloud and Azure series.
The Saga Pattern
What is a Saga?
A sequence of local transactions where each transaction updates data within a single service. If a step fails, compensating transactions undo the changes to maintain data consistency across distributed systems.
Two Types:
- Choreography: Each service produces and listens to events (decentralized)
- Orchestration: A central coordinator directs the saga (centralized)
Example with Orchestration (using MassTransit)
// Saga State (using primary constructors - C# 12+)
public class OrderSagaState : SagaStateMachineInstance
{
public required Guid CorrelationId { get; set; }
public required string CurrentState { get; set; }
public required Guid OrderId { get; set; }
public Guid? PaymentId { get; set; }
public Guid? ShipmentId { get; set; }
public DateTimeOffset CreatedAt { get; set; } = DateTimeOffset.UtcNow;
}
// Saga State Machine
public class OrderSagaStateMachine : MassTransitStateMachine<OrderSagaState>
{
public State OrderCreated { get; private set; } = null!;
public State PaymentProcessing { get; private set; } = null!;
public State PaymentCompleted { get; private set; } = null!;
public State ShipmentScheduled { get; private set; } = null!;
public State Completed { get; private set; } = null!;
public State Failed { get; private set; } = null!;
public Event<OrderCreatedEvent> OrderCreatedEvent { get; private set; } = null!;
public Event<PaymentCompletedEvent> PaymentCompletedEvent { get; private set; } = null!;
public Event<PaymentFailedEvent> PaymentFailedEvent { get; private set; } = null!;
public Event<ShipmentScheduledEvent> ShipmentScheduledEvent { get; private set; } = null!;
public OrderSagaStateMachine()
{
InstanceState(x => x.CurrentState);
Initially(
When(OrderCreatedEvent)
.Then(context =>
{
context.Saga.OrderId = context.Message.OrderId;
context.Saga.CreatedAt = DateTimeOffset.UtcNow;
})
.TransitionTo(PaymentProcessing)
.PublishAsync(context => context.Init<ProcessPaymentCommand>(new
{
context.Message.OrderId,
context.Message.TotalAmount,
CorrelationId = context.Saga.CorrelationId
}))
);
During(PaymentProcessing,
When(PaymentCompletedEvent)
.Then(context =>
{
context.Saga.PaymentId = context.Message.PaymentId;
})
.TransitionTo(PaymentCompleted)
.PublishAsync(context => context.Init<ScheduleShipmentCommand>(new
{
OrderId = context.Saga.OrderId,
CorrelationId = context.Saga.CorrelationId
})),
When(PaymentFailedEvent)
.TransitionTo(Failed)
.PublishAsync(context => context.Init<CancelOrderCommand>(new
{
OrderId = context.Saga.OrderId,
Reason = "Payment failed",
CorrelationId = context.Saga.CorrelationId
}))
);
During(PaymentCompleted,
When(ShipmentScheduledEvent)
.Then(context =>
{
context.Saga.ShipmentId = context.Message.ShipmentId;
})
.TransitionTo(Completed)
.Finalize()
);
SetCompletedWhenFinalized();
}
}
// Registration with Azure Service Bus (in Program.cs)
builder.Services.AddMassTransit(x =>
{
x.AddSagaStateMachine<OrderSagaStateMachine, OrderSagaState>()
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Optimistic;
r.AddDbContext<DbContext, SagaDbContext>((provider, cfg) =>
{
cfg.UseSqlServer(builder.Configuration.GetConnectionString("SagaDb"));
});
});
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(builder.Configuration["AzureServiceBus:ConnectionString"]);
cfg.ConfigureEndpoints(context);
});
});
Tools for Saga Pattern:
- MassTransit (v8.2+): State machine-based sagas with Azure Service Bus
- Dapr Workflow (v1.14+): Actor-based workflows with state management
- Azure Durable Functions: Serverless orchestration with built-in state persistence
- NServiceBus (v9+): Commercial option with advanced saga features
- Wolverine (v3+): Modern messaging with saga support
The Outbox Pattern
What is the Outbox Pattern?
Ensures reliable event publishing by saving events to a database table (the "outbox") in the same transaction as business data, then publishing them asynchronously. This guarantees at-least-once delivery.
Implementation with EF Core 9
// Outbox Message
public sealed record OutboxMessage
{
public required Guid Id { get; init; }
public required string Type { get; init; }
public required string Payload { get; init; }
public required DateTimeOffset CreatedAt { get; init; }
public DateTimeOffset? ProcessedAt { get; set; }
public int RetryCount { get; set; }
public string? Error { get; set; }
}
// Service with Outbox
public class OrderService(AppDbContext context, ILogger<OrderService> logger)
{
public async Task CreateOrderAsync(CreateOrderCommand command, CancellationToken ct = default)
{
// EF Core 9: Improved transaction handling
var strategy = context.Database.CreateExecutionStrategy();
await strategy.ExecuteAsync(async () =>
{
await using var transaction = await context.Database.BeginTransactionAsync(ct);
try
{
// Save business entity
var order = new Order(command.CustomerId, command.Items);
context.Orders.Add(order);
// Save event to outbox
var evt = new OrderCreatedEvent(order.Id, order.CustomerId, order.Items);
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
Type = evt.GetType().AssemblyQualifiedName!,
Payload = JsonSerializer.Serialize(evt, JsonSerializerOptions.Default),
CreatedAt = DateTimeOffset.UtcNow
};
context.OutboxMessages.Add(outboxMessage);
await context.SaveChangesAsync(ct);
await transaction.CommitAsync(ct);
logger.LogInformation("Order {OrderId} created with outbox message {MessageId}",
order.Id, outboxMessage.Id);
}
catch (Exception ex)
{
await transaction.RollbackAsync(ct);
logger.LogError(ex, "Failed to create order");
throw;
}
});
}
}
// Outbox Processor using IHostedLifecycleService (.NET 8+)
public class OutboxProcessor(
IServiceProvider serviceProvider,
ILogger<OutboxProcessor> logger) : BackgroundService
{
private readonly PeriodicTimer _timer = new(TimeSpan.FromSeconds(5));
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation("Outbox processor started");
while (!stoppingToken.IsCancellationRequested &&
await _timer.WaitForNextTickAsync(stoppingToken))
{
await ProcessOutboxMessagesAsync(stoppingToken);
}
}
private async Task ProcessOutboxMessagesAsync(CancellationToken ct)
{
await using var scope = serviceProvider.CreateAsyncScope();
var context = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var publisher = scope.ServiceProvider.GetRequiredService<IEventPublisher>();
// EF Core 9: ExecuteUpdate for better performance
var messages = await context.OutboxMessages
.Where(m => m.ProcessedAt == null && m.RetryCount < 5)
.OrderBy(m => m.CreatedAt)
.Take(100)
.ToListAsync(ct);
foreach (var message in messages)
{
try
{
await publisher.PublishAsync(message.Type, message.Payload, ct);
message.ProcessedAt = DateTimeOffset.UtcNow;
logger.LogInformation("Published outbox message {MessageId}", message.Id);
}
catch (Exception ex)
{
message.RetryCount++;
message.Error = ex.Message;
logger.LogError(ex, "Failed to publish message {MessageId}, retry {RetryCount}",
message.Id, message.RetryCount);
}
}
await context.SaveChangesAsync(ct);
// Clean up old processed messages (older than 7 days)
await context.OutboxMessages
.Where(m => m.ProcessedAt != null &&
m.ProcessedAt < DateTimeOffset.UtcNow.AddDays(-7))
.ExecuteDeleteAsync(ct);
}
public override void Dispose()
{
_timer.Dispose();
base.Dispose();
}
}
Azure Integration
// Using Azure Service Bus with Outbox
public class AzureServiceBusPublisher(ServiceBusClient client) : IEventPublisher
{
public async Task PublishAsync(string eventType, string payload, CancellationToken ct = default)
{
var sender = client.CreateSender("events");
var message = new ServiceBusMessage(payload)
{
ContentType = "application/json",
Subject = eventType,
MessageId = Guid.NewGuid().ToString()
};
await sender.SendMessageAsync(message, ct);
}
}
// Registration
builder.Services.AddSingleton(sp =>
{
var connectionString = builder.Configuration["AzureServiceBus:ConnectionString"]!;
return new ServiceBusClient(connectionString);
});
builder.Services.AddSingleton<IEventPublisher, AzureServiceBusPublisher>();
builder.Services.AddHostedService<OutboxProcessor>();
Tools:
- Entity Framework Core 9: ExecuteUpdate/Delete, complex types, improved performance
- MassTransit (v8.2+): Built-in outbox with automatic publishing
- Wolverine: Transactional messaging with PostgreSQL
- Azure Service Bus: Reliable message delivery
- Azure SQL Database: Outbox storage with geo-replication
Eventual Consistency
What is Eventual Consistency?
In distributed systems, data across services may be temporarily inconsistent but will eventually converge to a consistent state. This is a trade-off for availability and partition tolerance (CAP theorem).
Managing with CQRS Read Models
// Read model with complex type (EF Core 9)
public sealed record OrderReadModel
{
public required Guid OrderId { get; init; }
public required string Status { get; set; }
public required decimal TotalAmount { get; init; }
public required string PaymentStatus { get; set; }
public required Address ShippingAddress { get; init; }
public required DateTimeOffset LastUpdated { get; set; }
public TimeSpan? ProcessingTime =>
Status == "Completed" ? LastUpdated - CreatedAt : null;
private DateTimeOffset CreatedAt { get; init; }
}
// Complex type (EF Core 8+)
public record Address
{
public required string Street { get; init; }
public required string City { get; init; }
public required string PostalCode { get; init; }
}
// Event handlers with primary constructor
public class OrderReadModelUpdater(
IRepository<OrderReadModel> repository,
ILogger<OrderReadModelUpdater> logger) :
IEventHandler<OrderCreatedEvent>,
IEventHandler<PaymentCompletedEvent>,
IEventHandler<ShipmentScheduledEvent>
{
public async Task Handle(OrderCreatedEvent evt, CancellationToken ct = default)
{
var model = new OrderReadModel
{
OrderId = evt.OrderId,
Status = "Created",
TotalAmount = evt.TotalAmount,
PaymentStatus = "Pending",
ShippingAddress = new Address
{
Street = evt.ShippingAddress.Street,
City = evt.ShippingAddress.City,
PostalCode = evt.ShippingAddress.PostalCode
},
LastUpdated = DateTimeOffset.UtcNow
};
await repository.UpsertAsync(model, ct);
logger.LogInformation("Order read model created for {OrderId}", evt.OrderId);
}
public async Task Handle(PaymentCompletedEvent evt, CancellationToken ct = default)
{
var model = await repository.GetByIdAsync(evt.OrderId, ct);
if (model is null)
{
logger.LogWarning("Order {OrderId} not found in read model", evt.OrderId);
return;
}
model.PaymentStatus = "Completed";
model.Status = "Paid";
model.LastUpdated = DateTimeOffset.UtcNow;
await repository.UpdateAsync(model, ct);
}
public async Task Handle(ShipmentScheduledEvent evt, CancellationToken ct = default)
{
var model = await repository.GetByIdAsync(evt.OrderId, ct);
if (model is null)
{
logger.LogWarning("Order {OrderId} not found in read model", evt.OrderId);
return;
}
model.Status = "Shipped";
model.LastUpdated = DateTimeOffset.UtcNow;
await repository.UpdateAsync(model, ct);
}
}
Azure Cosmos DB for Read Models
// Using Cosmos DB for eventual consistency scenarios
public class CosmosOrderReadModelRepository(CosmosClient client) : IRepository<OrderReadModel>
{
private readonly Container _container = client
.GetContainer("OrdersDb", "OrderReadModels");
public async Task UpsertAsync(OrderReadModel model, CancellationToken ct = default)
{
await _container.UpsertItemAsync(
model,
new PartitionKey(model.OrderId.ToString()),
cancellationToken: ct);
}
public async Task<OrderReadModel?> GetByIdAsync(Guid orderId, CancellationToken ct = default)
{
try
{
var response = await _container.ReadItemAsync<OrderReadModel>(
orderId.ToString(),
new PartitionKey(orderId.ToString()),
cancellationToken: ct);
return response.Resource;
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
return null;
}
}
}
Idempotent Handlers
What is Idempotency?
An operation that produces the same result no matter how many times it's executed. Critical for handling message redeliveries and retries.
Implementation with Database Tracking
// Using record for processed message tracking
public sealed record ProcessedMessage
{
public required string MessageId { get; init; }
public required DateTimeOffset ProcessedAt { get; init; }
public required string HandlerType { get; init; }
}
// Idempotent event handler
public class IdempotentEventHandler(AppDbContext context, ILogger<IdempotentEventHandler> logger)
{
public async Task Handle(OrderCreatedEvent evt, string messageId, CancellationToken ct = default)
{
// Check if already processed
var exists = await context.ProcessedMessages
.AnyAsync(m => m.MessageId == messageId, ct);
if (exists)
{
logger.LogInformation("Message {MessageId} already processed, skipping", messageId);
return;
}
var strategy = context.Database.CreateExecutionStrategy();
await strategy.ExecuteAsync(async () =>
{
await using var transaction = await context.Database.BeginTransactionAsync(ct);
try
{
// Process event
var inventory = await context.Inventory
.FirstAsync(i => i.ProductId == evt.ProductId, ct);
inventory.Reserve(evt.Quantity);
// Record message as processed
context.ProcessedMessages.Add(new ProcessedMessage
{
MessageId = messageId,
ProcessedAt = DateTimeOffset.UtcNow,
HandlerType = nameof(IdempotentEventHandler)
});
await context.SaveChangesAsync(ct);
await transaction.CommitAsync(ct);
logger.LogInformation("Message {MessageId} processed successfully", messageId);
}
catch (Exception ex)
{
await transaction.RollbackAsync(ct);
logger.LogError(ex, "Failed to process message {MessageId}", messageId);
throw;
}
});
}
}
Azure Redis Cache for Idempotency
// Using Azure Redis for distributed idempotency tracking
public class RedisIdempotencyChecker(
IConnectionMultiplexer redis,
ILogger<RedisIdempotencyChecker> logger)
{
private readonly IDatabase _db = redis.GetDatabase();
public async Task<bool> TryProcessAsync(
string messageId,
Func<Task> handler,
CancellationToken ct = default)
{
var key = $"processed:{messageId}";
// Try to set the key with NX (only if not exists)
var wasSet = await _db.StringSetAsync(
key,
DateTimeOffset.UtcNow.ToString("O"),
expiry: TimeSpan.FromDays(7),
when: When.NotExists);
if (!wasSet)
{
logger.LogInformation("Message {MessageId} already processed", messageId);
return false;
}
try
{
await handler();
return true;
}
catch
{
// Remove the key on failure to allow retry
await _db.KeyDeleteAsync(key);
throw;
}
}
}
// Usage
public class OrderEventHandler(
RedisIdempotencyChecker idempotencyChecker,
IOrderService orderService)
{
public async Task Handle(OrderCreatedEvent evt, string messageId, CancellationToken ct)
{
await idempotencyChecker.TryProcessAsync(
messageId,
() => orderService.ProcessOrderAsync(evt, ct),
ct);
}
}
// Registration with Azure Cache for Redis
builder.Services.AddSingleton<IConnectionMultiplexer>(sp =>
{
var configuration = builder.Configuration["Azure:Redis:ConnectionString"]!;
return ConnectionMultiplexer.Connect(configuration);
});
builder.Services.AddSingleton<RedisIdempotencyChecker>();
Natural Idempotency with HTTP
// API endpoint with natural idempotency
[ApiController]
[Route("api/[controller]")]
public class OrdersController(IOrderService orderService) : ControllerBase
{
// PUT is naturally idempotent
[HttpPut("{orderId:guid}")]
public async Task<IActionResult> CreateOrder(
Guid orderId,
[FromBody] CreateOrderRequest request,
CancellationToken ct)
{
var result = await orderService.CreateOrUpdateOrderAsync(orderId, request, ct);
return result.IsNew ? CreatedAtAction(nameof(GetOrder), new { orderId }, result.Order)
: Ok(result.Order);
}
// Idempotency key header approach
[HttpPost]
public async Task<IActionResult> CreateOrderWithIdempotencyKey(
[FromBody] CreateOrderRequest request,
[FromHeader(Name = "Idempotency-Key")] string? idempotencyKey,
CancellationToken ct)
{
if (string.IsNullOrEmpty(idempotencyKey))
{
return BadRequest("Idempotency-Key header is required");
}
var result = await orderService.CreateOrderIdempotentAsync(idempotencyKey, request, ct);
return result.IsNew ? CreatedAtAction(nameof(GetOrder), new { orderId = result.OrderId }, result.Order)
: Ok(result.Order);
}
[HttpGet("{orderId:guid}")]
public async Task<IActionResult> GetOrder(Guid orderId, CancellationToken ct)
{
var order = await orderService.GetOrderAsync(orderId, ct);
return order is null ? NotFound() : Ok(order);
}
}
Compensating Actions
What are Compensating Actions?
Operations that undo the effects of previously completed transactions in a saga. Essential for maintaining consistency when a distributed transaction fails partway through.
Read more: Compensating Transaction pattern - Microsoft Azure Architecture
Implementation Example
// Payment service with compensation
public class PaymentService(
IPaymentGateway paymentGateway,
ILogger<PaymentService> logger)
{
public async Task<PaymentResult> ProcessPaymentAsync(
Guid orderId,
decimal amount,
CancellationToken ct = default)
{
logger.LogInformation("Processing payment for order {OrderId}, amount {Amount}",
orderId, amount);
var payment = await paymentGateway.ChargeAsync(amount, ct);
return new PaymentResult
{
Success = true,
PaymentId = payment.Id,
TransactionId = payment.TransactionId
};
}
// Compensating action
public async Task RefundPaymentAsync(
Guid paymentId,
string reason,
CancellationToken ct = default)
{
logger.LogWarning("Refunding payment {PaymentId}, reason: {Reason}",
paymentId, reason);
await paymentGateway.RefundAsync(paymentId, ct);
}
}
// Inventory service with compensation
public class InventoryService(
IRepository<Inventory> repository,
ILogger<InventoryService> logger)
{
public async Task ReserveInventoryAsync(
Guid orderId,
IReadOnlyList<OrderLine> items,
CancellationToken ct = default)
{
logger.LogInformation("Reserving inventory for order {OrderId}", orderId);
foreach (var item in items)
{
var inventory = await repository.GetAsync(item.ProductId, ct);
if (inventory is null || !inventory.CanReserve(item.Quantity))
{
throw new InsufficientInventoryException(item.ProductId, item.Quantity);
}
inventory.Reserve(orderId, item.Quantity);
}
await repository.SaveAsync(ct);
}
// Compensating action
public async Task ReleaseInventoryAsync(Guid orderId, CancellationToken ct = default)
{
logger.LogWarning("Releasing inventory for order {OrderId}", orderId);
var reservations = await repository.GetReservationsByOrderAsync(orderId, ct);
foreach (var reservation in reservations)
{
var inventory = await repository.GetAsync(reservation.ProductId, ct);
inventory?.Release(orderId);
}
await repository.SaveAsync(ct);
}
}
// Saga with compensating actions
public class OrderSagaWithCompensation : MassTransitStateMachine<OrderSagaState>
{
public Event<OrderCreatedEvent> OrderCreated { get; private set; } = null!;
public Event<PaymentFailedEvent> PaymentFailed { get; private set; } = null!;
public Event<InventoryReservationFailedEvent> InventoryFailed { get; private set; } = null!;
public OrderSagaWithCompensation()
{
// ... other state configurations ...
// Compensation flow when payment fails
During(PaymentProcessing,
When(PaymentFailed)
.Then(ctx => Log.Warning("Payment failed for order {OrderId}", ctx.Saga.OrderId))
.TransitionTo(Compensating)
.PublishAsync(ctx => ctx.Init<ReleaseInventoryCommand>(new
{
OrderId = ctx.Saga.OrderId,
Reason = "Payment failed"
}))
.PublishAsync(ctx => ctx.Init<CancelOrderCommand>(new
{
OrderId = ctx.Saga.OrderId,
Reason = "Payment failed"
}))
.TransitionTo(Failed)
.Finalize()
);
// Compensation flow when inventory reservation fails
During(InventoryReservation,
When(InventoryFailed)
.Then(ctx => Log.Warning("Inventory reservation failed for order {OrderId}",
ctx.Saga.OrderId))
.TransitionTo(Compensating)
.If(ctx => ctx.Saga.PaymentId.HasValue,
compensate => compensate
.PublishAsync(ctx => ctx.Init<RefundPaymentCommand>(new
{
PaymentId = ctx.Saga.PaymentId!.Value,
Reason = "Inventory unavailable"
})))
.PublishAsync(ctx => ctx.Init<CancelOrderCommand>(new
{
OrderId = ctx.Saga.OrderId,
Reason = "Inventory unavailable"
}))
.TransitionTo(Failed)
.Finalize()
);
}
}
Azure Durable Functions for Compensation
[DurableTask(nameof(OrderOrchestrator))]
public class OrderOrchestrator : TaskOrchestrator<OrderResult, OrderRequest>
{
public override async Task<OrderResult> RunAsync(
TaskOrchestrationContext context,
OrderRequest input)
{
var compensations = new Stack<Func<Task>>();
try
{
// Step 1: Reserve inventory
var inventoryReserved = await context.CallActivityAsync<bool>(
nameof(ReserveInventoryActivity),
input.OrderId);
if (!inventoryReserved)
{
return OrderResult.Failed("Inventory unavailable");
}
compensations.Push(() => context.CallActivityAsync(
nameof(ReleaseInventoryActivity),
input.OrderId));
// Step 2: Process payment
var paymentResult = await context.CallActivityAsync<PaymentResult>(
nameof(ProcessPaymentActivity),
input);
if (!paymentResult.Success)
{
await ExecuteCompensationsAsync(context, compensations);
return OrderResult.Failed("Payment failed");
}
compensations.Push(() => context.CallActivityAsync(
nameof(RefundPaymentActivity),
paymentResult.PaymentId));
// Step 3: Schedule shipment
await context.CallActivityAsync(
nameof(ScheduleShipmentActivity),
input.OrderId);
return OrderResult.Success(input.OrderId);
}
catch (Exception ex)
{
// Execute all compensating actions in reverse order
await ExecuteCompensationsAsync(context, compensations);
return OrderResult.Failed(ex.Message);
}
}
private static async Task ExecuteCompensationsAsync(
TaskOrchestrationContext context,
Stack<Func<Task>> compensations)
{
while (compensations.Count > 0)
{
var compensation = compensations.Pop();
try
{
await compensation();
}
catch (Exception ex)
{
// Log but continue with other compensations
context.SetCustomStatus($"Compensation failed: {ex.Message}");
}
}
}
}
Additional Patterns and Best Practices
Circuit Breaker with Polly
// Using Polly v8+ with resilience pipelines
builder.Services.AddHttpClient<IPaymentService, PaymentService>()
.AddResilienceHandler("payment-resilience", static builder =>
{
builder
.AddCircuitBreaker(new CircuitBreakerStrategyOptions
{
FailureRatio = 0.5,
SamplingDuration = TimeSpan.FromSeconds(30),
MinimumThroughput = 10,
BreakDuration = TimeSpan.FromSeconds(30)
})
.AddTimeout(TimeSpan.FromSeconds(10))
.AddRetry(new RetryStrategyOptions
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromSeconds(1),
BackoffType = DelayBackoffType.Exponential
});
});
Monitoring with Azure Application Insights
// Track saga execution
public class OrderSagaStateMachine : MassTransitStateMachine<OrderSagaState>
{
public OrderSagaStateMachine(TelemetryClient telemetry)
{
Initially(
When(OrderCreated)
.Then(context =>
{
telemetry.TrackEvent("OrderSagaStarted", new Dictionary<string, string>
{
["OrderId"] = context.Message.OrderId.ToString(),
["CorrelationId"] = context.Saga.CorrelationId.ToString()
});
})
// ... rest of saga logic
);
}
}
Summary
These patterns work together to handle distributed transactions in microservices:
- Saga Pattern: Coordinates multi-step transactions across services
- Outbox Pattern: Ensures reliable event publishing
- Eventual Consistency: Accept temporary inconsistency for availability
- Idempotent Handlers: Ensure operations can be safely retried
- Compensating Actions: Roll back completed steps when failures occur
Advanced Topics
Event Sourcing with Marten
Event sourcing stores all changes as a sequence of events, providing a complete audit trail and enabling temporal queries.
// Event-sourced aggregate
public record OrderCreated(Guid OrderId, string CustomerId, decimal Total);
public record PaymentProcessed(Guid OrderId, Guid PaymentId, decimal Amount);
public record OrderShipped(Guid OrderId, string TrackingNumber);
public record OrderCancelled(Guid OrderId, string Reason);
public class Order
{
public Guid Id { get; private set; }
public string CustomerId { get; private set; } = string.Empty;
public decimal Total { get; private set; }
public string Status { get; private set; } = "Created";
public Guid? PaymentId { get; private set; }
public string? TrackingNumber { get; private set; }
// Apply events to rebuild state
public void Apply(OrderCreated evt)
{
Id = evt.OrderId;
CustomerId = evt.CustomerId;
Total = evt.Total;
Status = "Created";
}
public void Apply(PaymentProcessed evt)
{
PaymentId = evt.PaymentId;
Status = "Paid";
}
public void Apply(OrderShipped evt)
{
TrackingNumber = evt.TrackingNumber;
Status = "Shipped";
}
public void Apply(OrderCancelled evt)
{
Status = "Cancelled";
}
}
// Using Marten for event sourcing
public class OrderService(IDocumentSession session)
{
public async Task<Guid> CreateOrderAsync(
string customerId,
decimal total,
CancellationToken ct = default)
{
var orderId = Guid.NewGuid();
var evt = new OrderCreated(orderId, customerId, total);
session.Events.StartStream<Order>(orderId, evt);
await session.SaveChangesAsync(ct);
return orderId;
}
public async Task ProcessPaymentAsync(
Guid orderId,
Guid paymentId,
decimal amount,
CancellationToken ct = default)
{
var evt = new PaymentProcessed(orderId, paymentId, amount);
session.Events.Append(orderId, evt);
await session.SaveChangesAsync(ct);
}
public async Task<Order?> GetOrderAsync(Guid orderId, CancellationToken ct = default)
{
return await session.Events.AggregateStreamAsync<Order>(orderId, token: ct);
}
public async Task<Order?> GetOrderAtTimeAsync(
Guid orderId,
DateTimeOffset timestamp,
CancellationToken ct = default)
{
return await session.Events.AggregateStreamAsync<Order>(
orderId,
timestamp: timestamp.UtcDateTime,
token: ct);
}
}
// Marten configuration with PostgreSQL
builder.Services.AddMarten(options =>
{
options.Connection(builder.Configuration.GetConnectionString("PostgreSQL")!);
// Enable event sourcing
options.Events.StreamIdentity = StreamIdentity.AsGuid;
// Projections for read models
options.Projections.Add<OrderProjection>(ProjectionLifecycle.Async);
})
.UseLightweightSessions()
.OptimizeArtifactWorkflow();
Distributed Tracing with OpenTelemetry
// Configure OpenTelemetry for distributed tracing
builder.Services.AddOpenTelemetry()
.WithTracing(tracing =>
{
tracing
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddEntityFrameworkCoreInstrumentation()
.AddSource("MassTransit")
.AddAzureMonitorTraceExporter(options =>
{
options.ConnectionString = builder.Configuration["ApplicationInsights:ConnectionString"];
});
})
.WithMetrics(metrics =>
{
metrics
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddMeter("MassTransit")
.AddAzureMonitorMetricExporter(options =>
{
options.ConnectionString = builder.Configuration["ApplicationInsights:ConnectionString"];
});
});
// Custom tracing in saga
public class OrderSagaStateMachine : MassTransitStateMachine<OrderSagaState>
{
private static readonly ActivitySource ActivitySource = new("OrderSaga");
public OrderSagaStateMachine()
{
Initially(
When(OrderCreatedEvent)
.Then(context =>
{
using var activity = ActivitySource.StartActivity("ProcessOrderCreated");
activity?.SetTag("order.id", context.Message.OrderId);
activity?.SetTag("saga.correlation_id", context.Saga.CorrelationId);
context.Saga.OrderId = context.Message.OrderId;
context.Saga.CreatedAt = DateTimeOffset.UtcNow;
})
.TransitionTo(PaymentProcessing)
.PublishAsync(context => context.Init<ProcessPaymentCommand>(new
{
context.Message.OrderId,
context.Message.TotalAmount,
CorrelationId = context.Saga.CorrelationId
}))
);
}
}
Inbox Pattern (Complement to Outbox)
The inbox pattern ensures exactly-once processing of incoming messages by tracking received messages before processing.
// Inbox message tracking
public sealed record InboxMessage
{
public required string MessageId { get; init; }
public required string Source { get; init; }
public required DateTimeOffset ReceivedAt { get; init; }
public DateTimeOffset? ProcessedAt { get; set; }
public string? Status { get; set; } = "Pending";
}
// Message handler with inbox
public class OrderEventConsumer(
AppDbContext context,
IOrderService orderService,
ILogger<OrderEventConsumer> logger) : IConsumer<OrderCreatedEvent>
{
public async Task Consume(ConsumeContext<OrderCreatedEvent> ctx)
{
var messageId = ctx.MessageId?.ToString() ?? Guid.NewGuid().ToString();
// Check inbox for duplicate
var inboxMessage = await context.InboxMessages
.FirstOrDefaultAsync(m => m.MessageId == messageId, ctx.CancellationToken);
if (inboxMessage?.ProcessedAt != null)
{
logger.LogInformation("Message {MessageId} already processed", messageId);
return;
}
var strategy = context.Database.CreateExecutionStrategy();
await strategy.ExecuteAsync(async () =>
{
await using var transaction = await context.Database.BeginTransactionAsync(ctx.CancellationToken);
try
{
// Record message in inbox
if (inboxMessage == null)
{
inboxMessage = new InboxMessage
{
MessageId = messageId,
Source = ctx.SourceAddress?.ToString() ?? "unknown",
ReceivedAt = DateTimeOffset.UtcNow
};
context.InboxMessages.Add(inboxMessage);
}
// Process the message
await orderService.HandleOrderCreatedAsync(ctx.Message, ctx.CancellationToken);
// Mark as processed
inboxMessage.ProcessedAt = DateTimeOffset.UtcNow;
inboxMessage.Status = "Processed";
await context.SaveChangesAsync(ctx.CancellationToken);
await transaction.CommitAsync(ctx.CancellationToken);
logger.LogInformation("Message {MessageId} processed successfully", messageId);
}
catch (Exception ex)
{
await transaction.RollbackAsync(ctx.CancellationToken);
logger.LogError(ex, "Failed to process message {MessageId}", messageId);
if (inboxMessage != null)
{
inboxMessage.Status = "Failed";
await context.SaveChangesAsync(ctx.CancellationToken);
}
throw;
}
});
}
}
Two-Phase Commit Alternative: Try-Confirm/Cancel (TCC)
TCC is a lightweight alternative to traditional two-phase commit, suitable for microservices.
// TCC transaction coordinator
public interface ITccParticipant
{
Task<bool> TryAsync(Guid transactionId, CancellationToken ct);
Task ConfirmAsync(Guid transactionId, CancellationToken ct);
Task CancelAsync(Guid transactionId, CancellationToken ct);
}
public class PaymentTccParticipant(
IPaymentGateway gateway,
IRepository<PaymentReservation> repository) : ITccParticipant
{
public async Task<bool> TryAsync(Guid transactionId, CancellationToken ct)
{
// Reserve funds without actually charging
var reservation = await gateway.ReserveFundsAsync(transactionId, ct);
if (reservation.Success)
{
await repository.AddAsync(new PaymentReservation
{
TransactionId = transactionId,
ReservationId = reservation.Id,
Status = "Reserved",
ExpiresAt = DateTimeOffset.UtcNow.AddMinutes(10)
}, ct);
}
return reservation.Success;
}
public async Task ConfirmAsync(Guid transactionId, CancellationToken ct)
{
var reservation = await repository.GetByTransactionIdAsync(transactionId, ct);
if (reservation != null)
{
await gateway.CaptureReservedFundsAsync(reservation.ReservationId, ct);
reservation.Status = "Confirmed";
await repository.UpdateAsync(reservation, ct);
}
}
public async Task CancelAsync(Guid transactionId, CancellationToken ct)
{
var reservation = await repository.GetByTransactionIdAsync(transactionId, ct);
if (reservation != null)
{
await gateway.ReleaseReservedFundsAsync(reservation.ReservationId, ct);
reservation.Status = "Cancelled";
await repository.UpdateAsync(reservation, ct);
}
}
}
// TCC Coordinator
public class TccCoordinator(
IEnumerable<ITccParticipant> participants,
ILogger<TccCoordinator> logger)
{
public async Task<bool> ExecuteAsync(Guid transactionId, CancellationToken ct = default)
{
var tryResults = new List<(ITccParticipant Participant, bool Success)>();
// Phase 1: Try
foreach (var participant in participants)
{
try
{
var success = await participant.TryAsync(transactionId, ct);
tryResults.Add((participant, success));
if (!success)
{
logger.LogWarning("Try phase failed for transaction {TransactionId}", transactionId);
await CancelAllAsync(transactionId, tryResults.Select(r => r.Participant), ct);
return false;
}
}
catch (Exception ex)
{
logger.LogError(ex, "Exception in try phase for transaction {TransactionId}", transactionId);
await CancelAllAsync(transactionId, tryResults.Select(r => r.Participant), ct);
return false;
}
}
// Phase 2: Confirm
foreach (var participant in participants)
{
try
{
await participant.ConfirmAsync(transactionId, ct);
}
catch (Exception ex)
{
logger.LogError(ex, "Exception in confirm phase for transaction {TransactionId}", transactionId);
// Continue confirming others - confirmation should be idempotent
}
}
return true;
}
private async Task CancelAllAsync(
Guid transactionId,
IEnumerable<ITccParticipant> participants,
CancellationToken ct)
{
foreach (var participant in participants)
{
try
{
await participant.CancelAsync(transactionId, ct);
}
catch (Exception ex)
{
logger.LogError(ex, "Exception in cancel phase for transaction {TransactionId}", transactionId);
}
}
}
}
Dead Letter Queue Handling
// Dead letter queue processor
public class DeadLetterProcessor(
ServiceBusClient client,
ILogger<DeadLetterProcessor> logger) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var receiver = client.CreateReceiver(
"orders",
new ServiceBusReceiverOptions
{
SubQueue = SubQueue.DeadLetter
});
while (!stoppingToken.IsCancellationRequested)
{
try
{
var messages = await receiver.ReceiveMessagesAsync(
maxMessages: 10,
maxWaitTime: TimeSpan.FromSeconds(5),
cancellationToken: stoppingToken);
foreach (var message in messages)
{
logger.LogWarning(
"Dead letter message: {MessageId}, Reason: {Reason}, Description: {Description}",
message.MessageId,
message.DeadLetterReason,
message.DeadLetterErrorDescription);
// Log to external monitoring system
// Optionally attempt reprocessing or manual intervention
await receiver.CompleteMessageAsync(message, stoppingToken);
}
}
catch (Exception ex)
{
logger.LogError(ex, "Error processing dead letter queue");
await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
}
}
}
}
Performance Optimization: Parallel Saga Execution
// Parallel saga steps where operations are independent
public class OptimizedOrderSagaStateMachine : MassTransitStateMachine<OrderSagaState>
{
public OptimizedOrderSagaStateMachine()
{
During(OrderCreated,
When(StartProcessing)
.Then(async context =>
{
// Execute independent operations in parallel
var inventoryTask = context.Publish<CheckInventoryCommand>(new
{
context.Saga.OrderId
});
var customerTask = context.Publish<ValidateCustomerCommand>(new
{
context.Saga.CustomerId
});
var pricingTask = context.Publish<CalculatePricingCommand>(new
{
context.Saga.OrderId
});
// Wait for all to complete
await Task.WhenAll(inventoryTask, customerTask, pricingTask);
})
.TransitionTo(ValidationInProgress)
);
}
}
Testing Strategies
Unit Testing Saga State Machines
public class OrderSagaStateMachineTests
{
[Fact]
public async Task Should_Transition_To_PaymentProcessing_When_Order_Created()
{
// Arrange
var harness = new InMemoryTestHarness();
var sagaHarness = harness.StateMachineSaga<OrderSagaState, OrderSagaStateMachine>();
await harness.Start();
try
{
var orderId = Guid.NewGuid();
var correlationId = Guid.NewGuid();
// Act
await harness.Bus.Publish(new OrderCreatedEvent
{
OrderId = orderId,
CustomerId = "customer-123",
TotalAmount = 100.00m,
CorrelationId = correlationId
});
// Assert
Assert.True(await sagaHarness.Consumed.Any<OrderCreatedEvent>());
var saga = sagaHarness.Created.ContainsInState(
correlationId,
sagaHarness.StateMachine,
sagaHarness.StateMachine.PaymentProcessing);
Assert.NotNull(saga);
Assert.True(await harness.Published.Any<ProcessPaymentCommand>());
}
finally
{
await harness.Stop();
}
}
}
Integration Testing with Testcontainers
public class OrderSagaIntegrationTests : IAsyncLifetime
{
private readonly PostgreSqlContainer _postgres = new PostgreSqlBuilder()
.WithImage("postgres:16-alpine")
.Build();
private readonly RabbitMqContainer _rabbitmq = new RabbitMqBuilder()
.WithImage("rabbitmq:3-management-alpine")
.Build();
public async Task InitializeAsync()
{
await _postgres.StartAsync();
await _rabbitmq.StartAsync();
}
[Fact]
public async Task Should_Complete_Full_Order_Saga()
{
// Arrange
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.AddSagaStateMachine<OrderSagaStateMachine, OrderSagaState>()
.InMemoryRepository();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(_rabbitmq.GetConnectionString());
cfg.ConfigureEndpoints(context);
});
});
var provider = services.BuildServiceProvider();
var bus = provider.GetRequiredService<IBus>();
await bus.StartAsync();
try
{
var orderId = Guid.NewGuid();
// Act
await bus.Publish(new OrderCreatedEvent
{
OrderId = orderId,
CustomerId = "test-customer",
TotalAmount = 150.00m
});
// Wait for saga completion
await Task.Delay(TimeSpan.FromSeconds(5));
// Assert - verify saga completed successfully
// This would query your saga repository in a real test
}
finally
{
await bus.StopAsync();
}
}
public async Task DisposeAsync()
{
await _postgres.DisposeAsync();
await _rabbitmq.DisposeAsync();
}
}
Recommended Resources
Official Documentation
Books
- Microservices Patterns by Chris Richardson
- Building Microservices by Sam Newman (3rd Edition)
- Software Architecture: The Hard Parts by Neal Ford et al.