Interceptors
Add cross-cutting concerns to Nimbus message handling using inbound and outbound interceptors
Overview
Interceptors allow you to add cross-cutting concerns — logging, metrics, tracing, validation, auditing — to message handling without modifying your handlers. Nimbus provides two interceptor points:
- Inbound interceptors: Run before and after a handler processes a message
- Outbound interceptors: Run before and after a message is sent or published
Both types support hooks for the executing, success, and error phases of each message operation.
Inbound Interceptors
Inbound interceptors implement IInboundInterceptor and wrap handler execution:
using Nimbus.Interceptors.Inbound;
using Nimbus.Infrastructure;
using Nimbus.MessageContracts;
public class LoggingInterceptor : IInboundInterceptor
{
private readonly ILogger _logger;
public int Priority => 0; // Lower = runs first
public LoggingInterceptor(ILogger logger)
{
_logger = logger;
}
public async Task OnCommandHandlerExecuting<TBusCommand>(
TBusCommand busCommand, NimbusMessage nimbusMessage)
where TBusCommand : IBusCommand
{
_logger.Information("Handling {CommandType} [{MessageId}]",
typeof(TBusCommand).Name, nimbusMessage.MessageId);
}
public async Task OnCommandHandlerSuccess<TBusCommand>(
TBusCommand busCommand, NimbusMessage nimbusMessage)
where TBusCommand : IBusCommand
{
_logger.Information("Handled {CommandType} successfully [{MessageId}]",
typeof(TBusCommand).Name, nimbusMessage.MessageId);
}
public async Task OnCommandHandlerError<TBusCommand>(
TBusCommand busCommand, NimbusMessage nimbusMessage, Exception exception)
where TBusCommand : IBusCommand
{
_logger.Error(exception, "Error handling {CommandType} [{MessageId}]",
typeof(TBusCommand).Name, nimbusMessage.MessageId);
}
// Implement similar methods for events, requests, and multicast requests...
}
Full Interface
IInboundInterceptor has hook methods for all four message pattern types:
| Pattern | Hooks |
|---|---|
| Commands | OnCommandHandlerExecuting, OnCommandHandlerSuccess, OnCommandHandlerError |
| Events | OnEventHandlerExecuting, OnEventHandlerSuccess, OnEventHandlerError |
| Requests | OnRequestHandlerExecuting, OnRequestHandlerSuccess, OnRequestHandlerError |
| Multicast Requests | OnMulticastRequestHandlerExecuting, OnMulticastRequestHandlerSuccess, OnMulticastRequestHandlerError |
You only need to implement the methods relevant to your interceptor. Provide empty implementations for the rest — consider creating a base class with no-op implementations to reduce boilerplate.
Outbound Interceptors
Outbound interceptors implement IOutboundInterceptor and wrap message sending:
using Nimbus.Interceptors.Outbound;
public class MetricsInterceptor : IOutboundInterceptor
{
private readonly IMetrics _metrics;
public int Priority => 0;
public MetricsInterceptor(IMetrics metrics)
{
_metrics = metrics;
}
public async Task OnCommandSending<TBusCommand>(
TBusCommand busCommand, NimbusMessage nimbusMessage)
where TBusCommand : IBusCommand
{
_metrics.Increment($"commands.sending.{typeof(TBusCommand).Name}");
}
public async Task OnCommandSent<TBusCommand>(
TBusCommand busCommand, NimbusMessage nimbusMessage)
where TBusCommand : IBusCommand
{
_metrics.Increment($"commands.sent.{typeof(TBusCommand).Name}");
}
public async Task OnCommandSendingError<TBusCommand>(
TBusCommand busCommand, NimbusMessage nimbusMessage, Exception exception)
where TBusCommand : IBusCommand
{
_metrics.Increment($"commands.error.{typeof(TBusCommand).Name}");
}
// Implement similar methods for events, requests, and multicast requests...
}
Full Interface
IOutboundInterceptor covers sending for all message types:
| Operation | Hooks |
|---|---|
| Commands | OnCommandSending, OnCommandSent, OnCommandSendingError |
| Events | OnEventPublishing, OnEventPublished, OnEventPublishingError |
| Requests | OnRequestSending, OnRequestSent, OnRequestSendingError |
| Responses | OnResponseSending, OnResponseSent, OnResponseSendingError |
| Multicast Requests | OnMulticastRequestSending, OnMulticastRequestSent, OnMulticastRequestSendingError |
| Multicast Responses | OnMulticastResponseSending, OnMulticastResponseSent, OnMulticastResponseSendingError |
Registering Interceptors
Register interceptors globally on the bus using their type:
var bus = new BusBuilder()
.Configure()
.WithTransport(transport)
.WithNames("OrderService", Environment.MachineName)
.WithTypesFrom(typeProvider)
.WithAutofacDefaults(container)
.WithGlobalInboundInterceptorTypes(typeof(LoggingInterceptor))
.WithGlobalOutboundInterceptorTypes(typeof(MetricsInterceptor))
.Build();
You can register multiple interceptors of each type:
.WithGlobalInboundInterceptorTypes(
typeof(LoggingInterceptor),
typeof(TracingInterceptor),
typeof(AuthorizationInterceptor))
Interceptors are resolved from your DI container, so they can have injected dependencies. Register them with your container before building the bus:
// With Autofac
builder.RegisterType<LoggingInterceptor>()
.AsSelf()
.InstancePerLifetimeScope();
builder.RegisterType<MetricsInterceptor>()
.AsSelf()
.InstancePerLifetimeScope();
Interceptor Priority
The Priority property controls the order interceptors run. Lower values run first on the way in, last on the way out (like middleware):
Message arrives
│
▼
[Interceptor Priority=0 → Executing]
│
▼
[Interceptor Priority=10 → Executing]
│
▼
[Handler.Handle()]
│
▼
[Interceptor Priority=10 → Success/Error]
│
▼
[Interceptor Priority=0 → Success/Error]
public class AuthorizationInterceptor : IInboundInterceptor
{
public int Priority => -10; // Runs before logging
}
public class LoggingInterceptor : IInboundInterceptor
{
public int Priority => 0;
}
public class PerformanceInterceptor : IInboundInterceptor
{
public int Priority => 10; // Runs closest to handler
}
Common Interceptor Patterns
Structured Logging with Correlation
public class CorrelationInterceptor : IInboundInterceptor
{
public int Priority => -100; // Run very early
public async Task OnCommandHandlerExecuting<TCommand>(
TCommand command, NimbusMessage message)
where TCommand : IBusCommand
{
// Push correlation ID into the logging context
using (LogContext.PushProperty("CorrelationId", message.CorrelationId))
using (LogContext.PushProperty("MessageId", message.MessageId))
{
// The using scope extends through the rest of the pipeline
// because interceptors are called synchronously in order
}
}
}
Performance Timing
public class TimingInterceptor : IInboundInterceptor
{
private readonly IMetrics _metrics;
private readonly ConcurrentDictionary<Guid, Stopwatch> _timers = new();
public int Priority => 0;
public async Task OnCommandHandlerExecuting<TCommand>(
TCommand command, NimbusMessage message)
where TCommand : IBusCommand
{
_timers[message.MessageId] = Stopwatch.StartNew();
}
public async Task OnCommandHandlerSuccess<TCommand>(
TCommand command, NimbusMessage message)
where TCommand : IBusCommand
{
if (_timers.TryRemove(message.MessageId, out var sw))
{
_metrics.Timing($"handler.{typeof(TCommand).Name}", sw.ElapsedMilliseconds);
}
}
public async Task OnCommandHandlerError<TCommand>(
TCommand command, NimbusMessage message, Exception ex)
where TCommand : IBusCommand
{
_timers.TryRemove(message.MessageId, out _);
}
}
Message Validation
public class ValidationInterceptor : IInboundInterceptor
{
public int Priority => -50;
public async Task OnCommandHandlerExecuting<TCommand>(
TCommand command, NimbusMessage message)
where TCommand : IBusCommand
{
if (command is IValidatable validatable)
{
var errors = validatable.Validate().ToList();
if (errors.Any())
{
throw new ValidationException(
$"Command {typeof(TCommand).Name} failed validation: {string.Join(", ", errors)}");
}
}
}
}
Next Steps
- Error Handling — dead letter queues and retry policies
- Dependency Injection — how interceptors are resolved from your container
- Guides: Logging — setting up structured logging with Serilog