Transport Abstraction
How Nimbus achieves transport independence and allows switching between messaging infrastructures
Overview
One of Nimbus’s core design principles is transport independence. Your application code should not depend on the specific messaging infrastructure you’re using. This abstraction allows you to:
- Switch transports without changing application code
- Test with in-process transport, deploy with Azure Service Bus
- Run different transports in different environments
- Migrate between messaging systems with minimal risk
The INimbusTransport Interface
All transports implement the INimbusTransport interface, which provides the core abstraction:
internal interface INimbusTransport
{
// Queue operations
INimbusMessageSender GetQueueSender(string queuePath);
INimbusMessageReceiver GetQueueReceiver(string queuePath);
// Topic/subscription operations
INimbusMessageSender GetTopicSender(string topicPath);
INimbusMessageReceiver GetTopicReceiver(
string topicPath,
string subscriptionName,
IFilterCondition filter);
}
This simple interface hides the complexity of different messaging systems behind a unified API.
Message Senders
The INimbusMessageSender interface abstracts message sending:
internal interface INimbusMessageSender
{
Task Send(NimbusMessage message);
Task SendBatch(IEnumerable<NimbusMessage> messages);
}
Implementation Examples
Each transport implements this differently:
Azure Service Bus:
public class AzureServiceBusQueueSender : INimbusMessageSender
{
private readonly QueueClient _queueClient;
public async Task Send(NimbusMessage message)
{
var brokeredMessage = _mapper.MapToNativeMessage(message);
await _queueClient.SendAsync(brokeredMessage);
}
}
Redis:
public class RedisQueueSender : INimbusMessageSender
{
private readonly IDatabase _redis;
public async Task Send(NimbusMessage message)
{
var serialized = _serializer.Serialize(message);
await _redis.ListLeftPushAsync(queueName, serialized);
}
}
In-Process:
public class InProcessQueueSender : INimbusMessageSender
{
private readonly BlockingCollection<NimbusMessage> _queue;
public async Task Send(NimbusMessage message)
{
_queue.Add(message);
await Task.CompletedTask;
}
}
Notice how different the implementations are, yet they all satisfy the same interface. Your application code doesn’t need to know these details.
Message Receivers
The INimbusMessageReceiver interface abstracts message receiving:
internal interface INimbusMessageReceiver
{
event EventHandler<MessageReceivedEventArgs> MessageReceived;
Task Start();
Task Stop();
void DiscardMessage(NimbusMessage message);
Task AcknowledgeMessage(NimbusMessage message);
}
Event-Based Model
Receivers use an event-based model for message delivery:
var receiver = transport.GetQueueReceiver("myqueue");
receiver.MessageReceived += async (sender, args) =>
{
var message = args.Message;
try
{
// Process the message
await ProcessMessage(message);
// Acknowledge successful processing
await receiver.AcknowledgeMessage(message);
}
catch (Exception ex)
{
// Discard or dead-letter the message
receiver.DiscardMessage(message);
}
};
await receiver.Start();
Message Pumps
Each transport implements a message pump that:
- Continuously polls/listens for messages
- Raises the
MessageReceivedevent - Handles acknowledgments and retries
- Manages connection health
The NimbusMessage Envelope
All messages are wrapped in a transport-agnostic NimbusMessage envelope:
public class NimbusMessage
{
// Identity
public Guid MessageId { get; set; }
public Guid CorrelationId { get; set; }
// Routing
public string From { get; set; }
public string To { get; set; }
public string DeliverTo { get; set; }
// Scheduling
public DateTimeOffset? DeliverAfter { get; set; }
public DateTimeOffset? ExpiresAfter { get; set; }
// Retry handling
public int DeliveryAttempts { get; set; }
// Extensibility
public Dictionary<string, object> Properties { get; set; }
// Payload
public object Payload { get; set; }
}
Native Message Mapping
Each transport maps NimbusMessage to its native format:
Azure Service Bus → BrokeredMessage:
var brokeredMessage = new Message
{
MessageId = nimbusMessage.MessageId.ToString(),
CorrelationId = nimbusMessage.CorrelationId.ToString(),
Body = SerializePayload(nimbusMessage.Payload),
ScheduledEnqueueTimeUtc = nimbusMessage.DeliverAfter,
TimeToLive = nimbusMessage.ExpiresAfter - DateTimeOffset.UtcNow,
};
foreach (var prop in nimbusMessage.Properties)
{
brokeredMessage.UserProperties.Add(prop.Key, prop.Value);
}
Redis → JSON String:
var redisMessage = JsonConvert.SerializeObject(new
{
MessageId = nimbusMessage.MessageId,
CorrelationId = nimbusMessage.CorrelationId,
Payload = nimbusMessage.Payload,
Properties = nimbusMessage.Properties,
// ... other fields
});
Transport Configuration
Each transport has its own configuration class:
Azure Service Bus
var transport = new AzureServiceBusTransportConfiguration()
.WithConnectionString(connectionString)
.WithDefaultTimeout(TimeSpan.FromSeconds(30))
.WithMaxConcurrentCalls(10);
Redis
var transport = new RedisTransportConfiguration()
.WithConnectionString("localhost:6379")
.WithDatabase(0)
.WithPollingInterval(TimeSpan.FromMilliseconds(100));
AMQP (ActiveMQ Artemis)
var transport = new AMQPTransportConfiguration()
.WithBrokerUri("amqp://localhost:5672")
.WithUsername("admin")
.WithPassword("admin");
In-Process
var transport = new InProcessTransportConfiguration()
.WithMaximumQueueLength(1000);
Transport Features Matrix
Different transports support different features:
| Feature | Azure Service Bus | Redis | AMQP | In-Process |
|---|---|---|---|---|
| Queues | ✅ | ✅ | ✅ | ✅ |
| Topics | ✅ | ✅ | ✅ | ✅ |
| Scheduled Delivery | ✅ | ⚠️ Limited | ⚠️ Limited | ✅ |
| Message TTL | ✅ | ✅ | ✅ | ✅ |
| Dead Letter Queue | ✅ | ⚠️ Manual | ✅ | ⚠️ Manual |
| Transactions | ✅ | ⚠️ Limited | ✅ | ✅ |
| Large Messages | ⚠️ 256KB | ⚠️ 512MB | ⚠️ Limited | ✅ Unlimited |
| Persistence | ✅ | ✅ | ✅ | ❌ |
Feature Compatibility: While Nimbus provides a unified API, some features may work differently or not be available on all transports. Always test your application against the target transport.
How Abstraction Works in Practice
1. Application Code Stays the Same
// This code works with ANY transport
public class OrderService
{
private readonly IBus _bus;
public async Task PlaceOrder(Order order)
{
await _bus.Send(new PlaceOrderCommand
{
OrderId = order.Id,
CustomerId = order.CustomerId
});
}
}
2. Only Configuration Changes
Development (In-Process):
var bus = new BusBuilder()
.Configure()
.WithTransport(new InProcessTransportConfiguration())
.WithNames("OrderService", Environment.MachineName)
.Build();
Staging (Redis):
var bus = new BusBuilder()
.Configure()
.WithTransport(new RedisTransportConfiguration()
.WithConnectionString("redis-staging.example.com"))
.WithNames("OrderService", Environment.MachineName)
.Build();
Production (Azure Service Bus):
var bus = new BusBuilder()
.Configure()
.WithTransport(new AzureServiceBusTransportConfiguration()
.WithConnectionString(azureConnectionString))
.WithNames("OrderService", Environment.MachineName)
.Build();
Benefits of Transport Abstraction
1. Easy Testing
Use in-process transport for unit/integration tests:
[Test]
public async Task OrderService_Should_SendCommand()
{
// Arrange
var transport = new InProcessTransportConfiguration();
var bus = new BusBuilder()
.Configure()
.WithTransport(transport)
.Build();
var orderService = new OrderService(bus);
// Act
await orderService.PlaceOrder(new Order { Id = "123" });
// Assert
var messages = transport.GetSentMessages();
Assert.That(messages, Has.Count.EqualTo(1));
}
2. Gradual Migration
Migrate from one transport to another service-by-service:
[Old System - AMQP/Artemis]
↕
[Bridge Service]
↕
[New System - Azure Service Bus]
3. Multi-Transport Scenarios
Use different transports for different purposes:
// Critical messages → Azure Service Bus (guaranteed delivery)
var criticalBus = new BusBuilder()
.Configure()
.WithTransport(azureServiceBusTransport)
.Build();
// High-volume events → Redis (fast, less overhead)
var eventBus = new BusBuilder()
.Configure()
.WithTransport(redisTransport)
.Build();
4. Vendor Independence
Avoid vendor lock-in. If your messaging provider:
- Increases prices significantly
- Deprecates features you depend on
- Has reliability issues
- Doesn’t meet performance requirements
You can switch transports without rewriting your application.
Transport Implementation Details
Queue vs. Topic Abstraction
Queues (Point-to-Point):
- One message → One consumer
- Load balancing across consumers
- Used for commands and requests
Topics (Pub/Sub):
- One message → Many subscribers
- Each subscription gets a copy
- Used for events
// Nimbus automatically chooses queue vs. topic based on message type
await bus.Send(command); // → Queue
await bus.Publish(@event); // → Topic
await bus.Request(request); // → Queue (with reply-to)
Connection Management
Transports handle connection pooling and resilience:
public class AzureServiceBusTransport : INimbusTransport
{
private readonly ConcurrentDictionary<string, QueueClient> _queueClients;
private readonly RetryPolicy _retryPolicy;
public INimbusMessageSender GetQueueSender(string queuePath)
{
var client = _queueClients.GetOrAdd(queuePath, path =>
{
var client = new QueueClient(connectionString, path);
client.RetryPolicy = _retryPolicy;
return client;
});
return new AzureServiceBusQueueSender(client);
}
}
Custom Transport Implementation
You can implement your own transport for specialized scenarios:
public class CustomTransport : INimbusTransport
{
public INimbusMessageSender GetQueueSender(string queuePath)
{
return new CustomQueueSender(queuePath);
}
public INimbusMessageReceiver GetQueueReceiver(string queuePath)
{
return new CustomQueueReceiver(queuePath);
}
// Implement topic methods...
}
Then use it in your configuration:
var bus = new BusBuilder()
.Configure()
.WithTransport(new CustomTransport())
.Build();
Creating Custom Transports: Useful for integrating with proprietary messaging systems, testing scenarios, or specialized requirements like file-system based messaging.
Best Practices
1. Design for the Lowest Common Denominator
Not all transports support all features. Design your messages to work with basic capabilities:
// ✅ GOOD: Works everywhere
public class SimpleCommand : IBusCommand
{
public string Id { get; set; }
public string Data { get; set; }
}
// ⚠️ CAREFUL: Requires large message support
public class LargeCommand : IBusCommand
{
public byte[] BigData { get; set; } // 10MB file
}
2. Abstract Transport Configuration
Don’t hardcode transport configuration. Use dependency injection and configuration files:
public class MessagingConfig
{
public string TransportType { get; set; }
public string ConnectionString { get; set; }
}
public static INimbusTransport CreateTransport(MessagingConfig config)
{
return config.TransportType switch
{
"AzureServiceBus" => new AzureServiceBusTransportConfiguration()
.WithConnectionString(config.ConnectionString),
"Redis" => new RedisTransportConfiguration()
.WithConnectionString(config.ConnectionString),
_ => throw new NotSupportedException()
};
}
3. Test Against Target Transport
Always run integration tests against your production transport:
[Test]
[Category("Integration")]
public async Task EndToEnd_With_AzureServiceBus()
{
var transport = new AzureServiceBusTransportConfiguration()
.WithConnectionString(testConnectionString);
// Test with actual Azure Service Bus
}
Next Steps
- Routing - Learn how messages are routed to queues and topics
- Transports Overview - Explore available transports in detail
- Dependency Injection - Understand DI integration