Common Patterns
Practical Nimbus patterns for real-world distributed system scenarios
Overview
This page covers practical patterns that come up repeatedly when building distributed systems with Nimbus. For a complete worked example across multiple services, see the Cafe Application.
1. Command with Acknowledgement
When you need to know a command was processed successfully, pair it with a request/response:
// Instead of fire-and-forget command, use a request
public class PlaceOrderRequest : IBusRequest<PlaceOrderRequest, PlaceOrderResponse>
{
public string CustomerId { get; set; }
public List<OrderItem> Items { get; set; }
}
public class PlaceOrderResponse : IBusResponse
{
public string OrderId { get; set; }
public OrderStatus Status { get; set; }
public string ErrorMessage { get; set; }
}
public class PlaceOrderHandler : IHandleRequest<PlaceOrderRequest, PlaceOrderResponse>
{
public async Task<PlaceOrderResponse> Handle(PlaceOrderRequest request)
{
try
{
var orderId = await _orderRepository.Create(request);
await _bus.Publish(new OrderPlacedEvent { OrderId = orderId });
return new PlaceOrderResponse
{
OrderId = orderId,
Status = OrderStatus.Accepted
};
}
catch (InsufficientInventoryException ex)
{
return new PlaceOrderResponse
{
Status = OrderStatus.Rejected,
ErrorMessage = ex.Message
};
}
}
}
// Caller
var result = await bus.Request<PlaceOrderRequest, PlaceOrderResponse>(
new PlaceOrderRequest { CustomerId = "CUST-123", Items = cartItems });
if (result.Status == OrderStatus.Rejected)
{
Console.WriteLine($"Order rejected: {result.ErrorMessage}");
}
2. Fan-Out with Aggregation
Send work to multiple handlers and aggregate the results using multicast requests:
public class GetShippingQuoteRequest
: IBusMulticastRequest<GetShippingQuoteRequest, ShippingQuoteDto>
{
public string FromZip { get; set; }
public string ToZip { get; set; }
public decimal WeightKg { get; set; }
}
public class ShippingQuoteDto : IBusMulticastResponse
{
public string Carrier { get; set; }
public decimal Price { get; set; }
public int EstimatedDays { get; set; }
}
// Each shipping provider registers its own handler
public class FedExQuoteHandler : IHandleMulticastRequest<GetShippingQuoteRequest, ShippingQuoteDto>
{
public async Task<ShippingQuoteDto> Handle(GetShippingQuoteRequest request)
{
var rate = await _fedExApi.GetRate(request.FromZip, request.ToZip, request.WeightKg);
return rate == null ? null : new ShippingQuoteDto
{
Carrier = "FedEx",
Price = rate.Price,
EstimatedDays = rate.Days
};
}
}
// Caller collects all quotes and picks cheapest
var quotes = await Task.WhenAll(
bus.MulticastRequest<GetShippingQuoteRequest, ShippingQuoteDto>(
new GetShippingQuoteRequest { FromZip = "10001", ToZip = "90210", WeightKg = 2.5m },
timeout: TimeSpan.FromSeconds(5))
.Select(async t => { try { return await t; } catch { return null; } }));
var cheapest = quotes
.Where(q => q != null)
.OrderBy(q => q.Price)
.FirstOrDefault();
3. Saga / Process Manager
Coordinate a multi-step workflow using events and state:
// Saga state stored in a database
public class OrderSagaState
{
public string OrderId { get; set; }
public bool PaymentReceived { get; set; }
public bool InventoryReserved { get; set; }
public bool ShipmentScheduled { get; set; }
}
// The saga listens to events and orchestrates next steps
public class OrderSaga :
IHandleCompetingEvent<OrderPlacedEvent>,
IHandleCompetingEvent<PaymentReceivedEvent>,
IHandleCompetingEvent<InventoryReservedEvent>
{
private readonly IOrderSagaRepository _sagaRepo;
private readonly IBus _bus;
public async Task Handle(OrderPlacedEvent @event)
{
var state = new OrderSagaState { OrderId = @event.OrderId };
await _sagaRepo.Save(state);
// Kick off parallel steps
await Task.WhenAll(
_bus.Send(new ProcessPaymentCommand { OrderId = @event.OrderId, Amount = @event.Amount }),
_bus.Send(new ReserveInventoryCommand { OrderId = @event.OrderId, Items = @event.Items })
);
}
public async Task Handle(PaymentReceivedEvent @event)
{
var state = await _sagaRepo.GetById(@event.OrderId);
state.PaymentReceived = true;
await _sagaRepo.Save(state);
await TryCompleteOrder(state);
}
public async Task Handle(InventoryReservedEvent @event)
{
var state = await _sagaRepo.GetById(@event.OrderId);
state.InventoryReserved = true;
await _sagaRepo.Save(state);
await TryCompleteOrder(state);
}
private async Task TryCompleteOrder(OrderSagaState state)
{
if (state.PaymentReceived && state.InventoryReserved)
{
await _bus.Send(new ScheduleShipmentCommand { OrderId = state.OrderId });
}
}
}
4. Idempotent Handler with Deduplication
Safely process messages that may arrive more than once:
public class SendWelcomeEmailHandler : IHandleCompetingEvent<UserRegisteredEvent>
{
private readonly IEmailService _emailService;
private readonly IMessageDeduplicator _dedup;
public async Task Handle(UserRegisteredEvent @event)
{
// Use the event's natural key for deduplication
var dedupKey = $"welcome-email:{@event.UserId}";
if (!await _dedup.TryAcquire(dedupKey, TimeSpan.FromDays(7)))
{
// Already sent — skip
return;
}
await _emailService.SendWelcome(@event.UserId, @event.Email);
}
}
// Simple Redis-based deduplicator
public class RedisMessageDeduplicator : IMessageDeduplicator
{
private readonly IDatabase _redis;
public async Task<bool> TryAcquire(string key, TimeSpan expiry)
{
// SET key 1 NX EX — only succeeds if key doesn't exist
return await _redis.StringSetAsync(key, "1", expiry, When.NotExists);
}
}
5. Retry with Exponential Backoff
For transient failures, implement backoff within the handler before letting Nimbus retry the whole message:
public class CallExternalApiHandler : IHandleCommand<SyncDataCommand>
{
private readonly IExternalApi _api;
public async Task Handle(SyncDataCommand command)
{
await RetryWithBackoff(async () =>
{
await _api.SyncData(command.ResourceId);
}, maxAttempts: 3);
}
private static async Task RetryWithBackoff(
Func<Task> action, int maxAttempts)
{
for (int attempt = 0; attempt < maxAttempts; attempt++)
{
try
{
await action();
return;
}
catch (HttpRequestException) when (attempt < maxAttempts - 1)
{
var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt)); // 1s, 2s, 4s
await Task.Delay(delay);
}
}
// Let the final exception propagate — Nimbus will retry the whole message
}
}
6. Competing Consumers for Parallel Processing
Deploy multiple instances to process commands in parallel — Nimbus and the transport handle the coordination automatically:
// Three identical handlers registered across three service instances
// Each command is processed by exactly one — the transport load-balances
public class ProcessReportHandler : IHandleCommand<GenerateReportCommand>
{
public async Task Handle(GenerateReportCommand command)
{
// Long-running report generation
var report = await _reportEngine.Generate(command.ReportId, command.Parameters);
await _reportStore.Save(report);
await _bus.Publish(new ReportGeneratedEvent { ReportId = command.ReportId });
}
}
# Deploy 3 instances — each handles ~1/3 of the load
dotnet run --instance 1
dotnet run --instance 2
dotnet run --instance 3
No additional configuration needed. Queue-based delivery ensures each command goes to exactly one handler.
7. Switching Transports Per Environment
A single factory method handles all environments:
public static IBus CreateBus(IConfiguration config, IContainer container)
{
var transport = config["Environment"] switch
{
"Development" => (TransportConfiguration)new RedisTransportConfiguration()
.WithConnectionString("localhost:6379"),
"Test" => new InProcessTransportConfiguration(),
"Production" => new AzureServiceBusTransportConfiguration()
.WithConnectionString(config["ServiceBus:ConnectionString"]),
_ => throw new InvalidOperationException("Unknown environment")
};
return new BusBuilder()
.Configure()
.WithTransport(transport)
.WithNames(config["ServiceName"], Environment.MachineName)
.WithTypesFrom(new AssemblyScanningTypeProvider(typeof(Program).Assembly))
.WithAutofacDefaults(container)
.WithJsonSerializer()
.WithSerilogLogger()
.Build();
}
8. Publishing Events from Domain Objects
Collect domain events during a unit of work and publish them after the transaction commits:
public class Order
{
private readonly List<IBusEvent> _events = new();
public IReadOnlyList<IBusEvent> Events => _events;
public void Place(string customerId, List<OrderItem> items)
{
Status = OrderStatus.Pending;
_events.Add(new OrderPlacedEvent { OrderId = Id, CustomerId = customerId });
}
public void Ship(string trackingNumber)
{
Status = OrderStatus.Shipped;
_events.Add(new OrderShippedEvent { OrderId = Id, TrackingNumber = trackingNumber });
}
}
// In the handler — publish after persisting
public class ShipOrderHandler : IHandleCommand<ShipOrderCommand>
{
private readonly IOrderRepository _repository;
private readonly IBus _bus;
public async Task Handle(ShipOrderCommand command)
{
var order = await _repository.GetById(command.OrderId);
order.Ship(command.TrackingNumber);
// Save first, then publish events
await _repository.Save(order);
foreach (var @event in order.Events)
{
await _bus.Publish(@event);
}
}
}
For true transactional guarantees between your database and the message bus, look into the Outbox pattern — store events in the same database transaction, then publish them asynchronously.
Next Steps
- Cafe Application — a complete multi-service example using many of these patterns
- Error Handling — retries, dead letters, and idempotency
- Message Patterns — when to use each pattern