Interceptors

Add cross-cutting concerns to Nimbus message handling using inbound and outbound interceptors

Overview

Interceptors allow you to add cross-cutting concerns — logging, metrics, tracing, validation, auditing — to message handling without modifying your handlers. Nimbus provides two interceptor points:

  • Inbound interceptors: Run before and after a handler processes a message
  • Outbound interceptors: Run before and after a message is sent or published

Both types support hooks for the executing, success, and error phases of each message operation.

Inbound Interceptors

Inbound interceptors implement IInboundInterceptor and wrap handler execution:

using Nimbus.Interceptors.Inbound;
using Nimbus.Infrastructure;
using Nimbus.MessageContracts;

public class LoggingInterceptor : IInboundInterceptor
{
    private readonly ILogger _logger;

    public int Priority => 0; // Lower = runs first

    public LoggingInterceptor(ILogger logger)
    {
        _logger = logger;
    }

    public async Task OnCommandHandlerExecuting<TBusCommand>(
        TBusCommand busCommand, NimbusMessage nimbusMessage)
        where TBusCommand : IBusCommand
    {
        _logger.Information("Handling {CommandType} [{MessageId}]",
            typeof(TBusCommand).Name, nimbusMessage.MessageId);
    }

    public async Task OnCommandHandlerSuccess<TBusCommand>(
        TBusCommand busCommand, NimbusMessage nimbusMessage)
        where TBusCommand : IBusCommand
    {
        _logger.Information("Handled {CommandType} successfully [{MessageId}]",
            typeof(TBusCommand).Name, nimbusMessage.MessageId);
    }

    public async Task OnCommandHandlerError<TBusCommand>(
        TBusCommand busCommand, NimbusMessage nimbusMessage, Exception exception)
        where TBusCommand : IBusCommand
    {
        _logger.Error(exception, "Error handling {CommandType} [{MessageId}]",
            typeof(TBusCommand).Name, nimbusMessage.MessageId);
    }

    // Implement similar methods for events, requests, and multicast requests...
}

Full Interface

IInboundInterceptor has hook methods for all four message pattern types:

PatternHooks
CommandsOnCommandHandlerExecuting, OnCommandHandlerSuccess, OnCommandHandlerError
EventsOnEventHandlerExecuting, OnEventHandlerSuccess, OnEventHandlerError
RequestsOnRequestHandlerExecuting, OnRequestHandlerSuccess, OnRequestHandlerError
Multicast RequestsOnMulticastRequestHandlerExecuting, OnMulticastRequestHandlerSuccess, OnMulticastRequestHandlerError

You only need to implement the methods relevant to your interceptor. Provide empty implementations for the rest — consider creating a base class with no-op implementations to reduce boilerplate.

Outbound Interceptors

Outbound interceptors implement IOutboundInterceptor and wrap message sending:

using Nimbus.Interceptors.Outbound;

public class MetricsInterceptor : IOutboundInterceptor
{
    private readonly IMetrics _metrics;

    public int Priority => 0;

    public MetricsInterceptor(IMetrics metrics)
    {
        _metrics = metrics;
    }

    public async Task OnCommandSending<TBusCommand>(
        TBusCommand busCommand, NimbusMessage nimbusMessage)
        where TBusCommand : IBusCommand
    {
        _metrics.Increment($"commands.sending.{typeof(TBusCommand).Name}");
    }

    public async Task OnCommandSent<TBusCommand>(
        TBusCommand busCommand, NimbusMessage nimbusMessage)
        where TBusCommand : IBusCommand
    {
        _metrics.Increment($"commands.sent.{typeof(TBusCommand).Name}");
    }

    public async Task OnCommandSendingError<TBusCommand>(
        TBusCommand busCommand, NimbusMessage nimbusMessage, Exception exception)
        where TBusCommand : IBusCommand
    {
        _metrics.Increment($"commands.error.{typeof(TBusCommand).Name}");
    }

    // Implement similar methods for events, requests, and multicast requests...
}

Full Interface

IOutboundInterceptor covers sending for all message types:

OperationHooks
CommandsOnCommandSending, OnCommandSent, OnCommandSendingError
EventsOnEventPublishing, OnEventPublished, OnEventPublishingError
RequestsOnRequestSending, OnRequestSent, OnRequestSendingError
ResponsesOnResponseSending, OnResponseSent, OnResponseSendingError
Multicast RequestsOnMulticastRequestSending, OnMulticastRequestSent, OnMulticastRequestSendingError
Multicast ResponsesOnMulticastResponseSending, OnMulticastResponseSent, OnMulticastResponseSendingError

Registering Interceptors

Register interceptors globally on the bus using their type:

var bus = new BusBuilder()
    .Configure()
    .WithTransport(transport)
    .WithNames("OrderService", Environment.MachineName)
    .WithTypesFrom(typeProvider)
    .WithAutofacDefaults(container)
    .WithGlobalInboundInterceptorTypes(typeof(LoggingInterceptor))
    .WithGlobalOutboundInterceptorTypes(typeof(MetricsInterceptor))
    .Build();

You can register multiple interceptors of each type:

.WithGlobalInboundInterceptorTypes(
    typeof(LoggingInterceptor),
    typeof(TracingInterceptor),
    typeof(AuthorizationInterceptor))

Interceptors are resolved from your DI container, so they can have injected dependencies. Register them with your container before building the bus:

// With Autofac
builder.RegisterType<LoggingInterceptor>()
    .AsSelf()
    .InstancePerLifetimeScope();

builder.RegisterType<MetricsInterceptor>()
    .AsSelf()
    .InstancePerLifetimeScope();

Interceptor Priority

The Priority property controls the order interceptors run. Lower values run first on the way in, last on the way out (like middleware):

Message arrives


[Interceptor Priority=0  → Executing]


[Interceptor Priority=10 → Executing]


[Handler.Handle()]


[Interceptor Priority=10 → Success/Error]


[Interceptor Priority=0  → Success/Error]
public class AuthorizationInterceptor : IInboundInterceptor
{
    public int Priority => -10; // Runs before logging
}

public class LoggingInterceptor : IInboundInterceptor
{
    public int Priority => 0;
}

public class PerformanceInterceptor : IInboundInterceptor
{
    public int Priority => 10; // Runs closest to handler
}

Common Interceptor Patterns

Structured Logging with Correlation

public class CorrelationInterceptor : IInboundInterceptor
{
    public int Priority => -100; // Run very early

    public async Task OnCommandHandlerExecuting<TCommand>(
        TCommand command, NimbusMessage message)
        where TCommand : IBusCommand
    {
        // Push correlation ID into the logging context
        using (LogContext.PushProperty("CorrelationId", message.CorrelationId))
        using (LogContext.PushProperty("MessageId", message.MessageId))
        {
            // The using scope extends through the rest of the pipeline
            // because interceptors are called synchronously in order
        }
    }
}

Performance Timing

public class TimingInterceptor : IInboundInterceptor
{
    private readonly IMetrics _metrics;
    private readonly ConcurrentDictionary<Guid, Stopwatch> _timers = new();

    public int Priority => 0;

    public async Task OnCommandHandlerExecuting<TCommand>(
        TCommand command, NimbusMessage message)
        where TCommand : IBusCommand
    {
        _timers[message.MessageId] = Stopwatch.StartNew();
    }

    public async Task OnCommandHandlerSuccess<TCommand>(
        TCommand command, NimbusMessage message)
        where TCommand : IBusCommand
    {
        if (_timers.TryRemove(message.MessageId, out var sw))
        {
            _metrics.Timing($"handler.{typeof(TCommand).Name}", sw.ElapsedMilliseconds);
        }
    }

    public async Task OnCommandHandlerError<TCommand>(
        TCommand command, NimbusMessage message, Exception ex)
        where TCommand : IBusCommand
    {
        _timers.TryRemove(message.MessageId, out _);
    }
}

Message Validation

public class ValidationInterceptor : IInboundInterceptor
{
    public int Priority => -50;

    public async Task OnCommandHandlerExecuting<TCommand>(
        TCommand command, NimbusMessage message)
        where TCommand : IBusCommand
    {
        if (command is IValidatable validatable)
        {
            var errors = validatable.Validate().ToList();
            if (errors.Any())
            {
                throw new ValidationException(
                    $"Command {typeof(TCommand).Name} failed validation: {string.Join(", ", errors)}");
            }
        }
    }
}

Next Steps