Common Patterns

Practical Nimbus patterns for real-world distributed system scenarios

Overview

This page covers practical patterns that come up repeatedly when building distributed systems with Nimbus. For a complete worked example across multiple services, see the Cafe Application.


1. Command with Acknowledgement

When you need to know a command was processed successfully, pair it with a request/response:

// Instead of fire-and-forget command, use a request
public class PlaceOrderRequest : IBusRequest<PlaceOrderRequest, PlaceOrderResponse>
{
    public string CustomerId { get; set; }
    public List<OrderItem> Items { get; set; }
}

public class PlaceOrderResponse : IBusResponse
{
    public string OrderId { get; set; }
    public OrderStatus Status { get; set; }
    public string ErrorMessage { get; set; }
}

public class PlaceOrderHandler : IHandleRequest<PlaceOrderRequest, PlaceOrderResponse>
{
    public async Task<PlaceOrderResponse> Handle(PlaceOrderRequest request)
    {
        try
        {
            var orderId = await _orderRepository.Create(request);
            await _bus.Publish(new OrderPlacedEvent { OrderId = orderId });

            return new PlaceOrderResponse
            {
                OrderId = orderId,
                Status = OrderStatus.Accepted
            };
        }
        catch (InsufficientInventoryException ex)
        {
            return new PlaceOrderResponse
            {
                Status = OrderStatus.Rejected,
                ErrorMessage = ex.Message
            };
        }
    }
}

// Caller
var result = await bus.Request<PlaceOrderRequest, PlaceOrderResponse>(
    new PlaceOrderRequest { CustomerId = "CUST-123", Items = cartItems });

if (result.Status == OrderStatus.Rejected)
{
    Console.WriteLine($"Order rejected: {result.ErrorMessage}");
}

2. Fan-Out with Aggregation

Send work to multiple handlers and aggregate the results using multicast requests:

public class GetShippingQuoteRequest
    : IBusMulticastRequest<GetShippingQuoteRequest, ShippingQuoteDto>
{
    public string FromZip { get; set; }
    public string ToZip { get; set; }
    public decimal WeightKg { get; set; }
}

public class ShippingQuoteDto : IBusMulticastResponse
{
    public string Carrier { get; set; }
    public decimal Price { get; set; }
    public int EstimatedDays { get; set; }
}

// Each shipping provider registers its own handler
public class FedExQuoteHandler : IHandleMulticastRequest<GetShippingQuoteRequest, ShippingQuoteDto>
{
    public async Task<ShippingQuoteDto> Handle(GetShippingQuoteRequest request)
    {
        var rate = await _fedExApi.GetRate(request.FromZip, request.ToZip, request.WeightKg);
        return rate == null ? null : new ShippingQuoteDto
        {
            Carrier = "FedEx",
            Price = rate.Price,
            EstimatedDays = rate.Days
        };
    }
}

// Caller collects all quotes and picks cheapest
var quotes = await Task.WhenAll(
    bus.MulticastRequest<GetShippingQuoteRequest, ShippingQuoteDto>(
        new GetShippingQuoteRequest { FromZip = "10001", ToZip = "90210", WeightKg = 2.5m },
        timeout: TimeSpan.FromSeconds(5))
    .Select(async t => { try { return await t; } catch { return null; } }));

var cheapest = quotes
    .Where(q => q != null)
    .OrderBy(q => q.Price)
    .FirstOrDefault();

3. Saga / Process Manager

Coordinate a multi-step workflow using events and state:

// Saga state stored in a database
public class OrderSagaState
{
    public string OrderId { get; set; }
    public bool PaymentReceived { get; set; }
    public bool InventoryReserved { get; set; }
    public bool ShipmentScheduled { get; set; }
}

// The saga listens to events and orchestrates next steps
public class OrderSaga :
    IHandleCompetingEvent<OrderPlacedEvent>,
    IHandleCompetingEvent<PaymentReceivedEvent>,
    IHandleCompetingEvent<InventoryReservedEvent>
{
    private readonly IOrderSagaRepository _sagaRepo;
    private readonly IBus _bus;

    public async Task Handle(OrderPlacedEvent @event)
    {
        var state = new OrderSagaState { OrderId = @event.OrderId };
        await _sagaRepo.Save(state);

        // Kick off parallel steps
        await Task.WhenAll(
            _bus.Send(new ProcessPaymentCommand { OrderId = @event.OrderId, Amount = @event.Amount }),
            _bus.Send(new ReserveInventoryCommand { OrderId = @event.OrderId, Items = @event.Items })
        );
    }

    public async Task Handle(PaymentReceivedEvent @event)
    {
        var state = await _sagaRepo.GetById(@event.OrderId);
        state.PaymentReceived = true;
        await _sagaRepo.Save(state);
        await TryCompleteOrder(state);
    }

    public async Task Handle(InventoryReservedEvent @event)
    {
        var state = await _sagaRepo.GetById(@event.OrderId);
        state.InventoryReserved = true;
        await _sagaRepo.Save(state);
        await TryCompleteOrder(state);
    }

    private async Task TryCompleteOrder(OrderSagaState state)
    {
        if (state.PaymentReceived && state.InventoryReserved)
        {
            await _bus.Send(new ScheduleShipmentCommand { OrderId = state.OrderId });
        }
    }
}

4. Idempotent Handler with Deduplication

Safely process messages that may arrive more than once:

public class SendWelcomeEmailHandler : IHandleCompetingEvent<UserRegisteredEvent>
{
    private readonly IEmailService _emailService;
    private readonly IMessageDeduplicator _dedup;

    public async Task Handle(UserRegisteredEvent @event)
    {
        // Use the event's natural key for deduplication
        var dedupKey = $"welcome-email:{@event.UserId}";

        if (!await _dedup.TryAcquire(dedupKey, TimeSpan.FromDays(7)))
        {
            // Already sent — skip
            return;
        }

        await _emailService.SendWelcome(@event.UserId, @event.Email);
    }
}

// Simple Redis-based deduplicator
public class RedisMessageDeduplicator : IMessageDeduplicator
{
    private readonly IDatabase _redis;

    public async Task<bool> TryAcquire(string key, TimeSpan expiry)
    {
        // SET key 1 NX EX — only succeeds if key doesn't exist
        return await _redis.StringSetAsync(key, "1", expiry, When.NotExists);
    }
}

5. Retry with Exponential Backoff

For transient failures, implement backoff within the handler before letting Nimbus retry the whole message:

public class CallExternalApiHandler : IHandleCommand<SyncDataCommand>
{
    private readonly IExternalApi _api;

    public async Task Handle(SyncDataCommand command)
    {
        await RetryWithBackoff(async () =>
        {
            await _api.SyncData(command.ResourceId);
        }, maxAttempts: 3);
    }

    private static async Task RetryWithBackoff(
        Func<Task> action, int maxAttempts)
    {
        for (int attempt = 0; attempt < maxAttempts; attempt++)
        {
            try
            {
                await action();
                return;
            }
            catch (HttpRequestException) when (attempt < maxAttempts - 1)
            {
                var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt)); // 1s, 2s, 4s
                await Task.Delay(delay);
            }
        }

        // Let the final exception propagate — Nimbus will retry the whole message
    }
}

6. Competing Consumers for Parallel Processing

Deploy multiple instances to process commands in parallel — Nimbus and the transport handle the coordination automatically:

// Three identical handlers registered across three service instances
// Each command is processed by exactly one — the transport load-balances
public class ProcessReportHandler : IHandleCommand<GenerateReportCommand>
{
    public async Task Handle(GenerateReportCommand command)
    {
        // Long-running report generation
        var report = await _reportEngine.Generate(command.ReportId, command.Parameters);
        await _reportStore.Save(report);
        await _bus.Publish(new ReportGeneratedEvent { ReportId = command.ReportId });
    }
}
# Deploy 3 instances — each handles ~1/3 of the load
dotnet run --instance 1
dotnet run --instance 2
dotnet run --instance 3

No additional configuration needed. Queue-based delivery ensures each command goes to exactly one handler.


7. Switching Transports Per Environment

A single factory method handles all environments:

public static IBus CreateBus(IConfiguration config, IContainer container)
{
    var transport = config["Environment"] switch
    {
        "Development" => (TransportConfiguration)new RedisTransportConfiguration()
            .WithConnectionString("localhost:6379"),

        "Test" => new InProcessTransportConfiguration(),

        "Production" => new AzureServiceBusTransportConfiguration()
            .WithConnectionString(config["ServiceBus:ConnectionString"]),

        _ => throw new InvalidOperationException("Unknown environment")
    };

    return new BusBuilder()
        .Configure()
        .WithTransport(transport)
        .WithNames(config["ServiceName"], Environment.MachineName)
        .WithTypesFrom(new AssemblyScanningTypeProvider(typeof(Program).Assembly))
        .WithAutofacDefaults(container)
        .WithJsonSerializer()
        .WithSerilogLogger()
        .Build();
}

8. Publishing Events from Domain Objects

Collect domain events during a unit of work and publish them after the transaction commits:

public class Order
{
    private readonly List<IBusEvent> _events = new();
    public IReadOnlyList<IBusEvent> Events => _events;

    public void Place(string customerId, List<OrderItem> items)
    {
        Status = OrderStatus.Pending;
        _events.Add(new OrderPlacedEvent { OrderId = Id, CustomerId = customerId });
    }

    public void Ship(string trackingNumber)
    {
        Status = OrderStatus.Shipped;
        _events.Add(new OrderShippedEvent { OrderId = Id, TrackingNumber = trackingNumber });
    }
}

// In the handler — publish after persisting
public class ShipOrderHandler : IHandleCommand<ShipOrderCommand>
{
    private readonly IOrderRepository _repository;
    private readonly IBus _bus;

    public async Task Handle(ShipOrderCommand command)
    {
        var order = await _repository.GetById(command.OrderId);
        order.Ship(command.TrackingNumber);

        // Save first, then publish events
        await _repository.Save(order);

        foreach (var @event in order.Events)
        {
            await _bus.Publish(@event);
        }
    }
}

For true transactional guarantees between your database and the message bus, look into the Outbox pattern — store events in the same database transaction, then publish them asynchronously.


Next Steps