Transport Abstraction

How Nimbus achieves transport independence and allows switching between messaging infrastructures

Overview

One of Nimbus’s core design principles is transport independence. Your application code should not depend on the specific messaging infrastructure you’re using. This abstraction allows you to:

  • Switch transports without changing application code
  • Test with in-process transport, deploy with Azure Service Bus
  • Run different transports in different environments
  • Migrate between messaging systems with minimal risk

The INimbusTransport Interface

All transports implement the INimbusTransport interface, which provides the core abstraction:

internal interface INimbusTransport
{
    // Queue operations
    INimbusMessageSender GetQueueSender(string queuePath);
    INimbusMessageReceiver GetQueueReceiver(string queuePath);

    // Topic/subscription operations
    INimbusMessageSender GetTopicSender(string topicPath);
    INimbusMessageReceiver GetTopicReceiver(
        string topicPath,
        string subscriptionName,
        IFilterCondition filter);
}

This simple interface hides the complexity of different messaging systems behind a unified API.

Message Senders

The INimbusMessageSender interface abstracts message sending:

internal interface INimbusMessageSender
{
    Task Send(NimbusMessage message);
    Task SendBatch(IEnumerable<NimbusMessage> messages);
}

Implementation Examples

Each transport implements this differently:

Azure Service Bus:

public class AzureServiceBusQueueSender : INimbusMessageSender
{
    private readonly QueueClient _queueClient;

    public async Task Send(NimbusMessage message)
    {
        var brokeredMessage = _mapper.MapToNativeMessage(message);
        await _queueClient.SendAsync(brokeredMessage);
    }
}

Redis:

public class RedisQueueSender : INimbusMessageSender
{
    private readonly IDatabase _redis;

    public async Task Send(NimbusMessage message)
    {
        var serialized = _serializer.Serialize(message);
        await _redis.ListLeftPushAsync(queueName, serialized);
    }
}

In-Process:

public class InProcessQueueSender : INimbusMessageSender
{
    private readonly BlockingCollection<NimbusMessage> _queue;

    public async Task Send(NimbusMessage message)
    {
        _queue.Add(message);
        await Task.CompletedTask;
    }
}

Notice how different the implementations are, yet they all satisfy the same interface. Your application code doesn’t need to know these details.

Message Receivers

The INimbusMessageReceiver interface abstracts message receiving:

internal interface INimbusMessageReceiver
{
    event EventHandler<MessageReceivedEventArgs> MessageReceived;

    Task Start();
    Task Stop();

    void DiscardMessage(NimbusMessage message);
    Task AcknowledgeMessage(NimbusMessage message);
}

Event-Based Model

Receivers use an event-based model for message delivery:

var receiver = transport.GetQueueReceiver("myqueue");

receiver.MessageReceived += async (sender, args) =>
{
    var message = args.Message;

    try
    {
        // Process the message
        await ProcessMessage(message);

        // Acknowledge successful processing
        await receiver.AcknowledgeMessage(message);
    }
    catch (Exception ex)
    {
        // Discard or dead-letter the message
        receiver.DiscardMessage(message);
    }
};

await receiver.Start();

Message Pumps

Each transport implements a message pump that:

  1. Continuously polls/listens for messages
  2. Raises the MessageReceived event
  3. Handles acknowledgments and retries
  4. Manages connection health

The NimbusMessage Envelope

All messages are wrapped in a transport-agnostic NimbusMessage envelope:

public class NimbusMessage
{
    // Identity
    public Guid MessageId { get; set; }
    public Guid CorrelationId { get; set; }

    // Routing
    public string From { get; set; }
    public string To { get; set; }
    public string DeliverTo { get; set; }

    // Scheduling
    public DateTimeOffset? DeliverAfter { get; set; }
    public DateTimeOffset? ExpiresAfter { get; set; }

    // Retry handling
    public int DeliveryAttempts { get; set; }

    // Extensibility
    public Dictionary<string, object> Properties { get; set; }

    // Payload
    public object Payload { get; set; }
}

Native Message Mapping

Each transport maps NimbusMessage to its native format:

Azure Service Bus → BrokeredMessage:

var brokeredMessage = new Message
{
    MessageId = nimbusMessage.MessageId.ToString(),
    CorrelationId = nimbusMessage.CorrelationId.ToString(),
    Body = SerializePayload(nimbusMessage.Payload),
    ScheduledEnqueueTimeUtc = nimbusMessage.DeliverAfter,
    TimeToLive = nimbusMessage.ExpiresAfter - DateTimeOffset.UtcNow,
};

foreach (var prop in nimbusMessage.Properties)
{
    brokeredMessage.UserProperties.Add(prop.Key, prop.Value);
}

Redis → JSON String:

var redisMessage = JsonConvert.SerializeObject(new
{
    MessageId = nimbusMessage.MessageId,
    CorrelationId = nimbusMessage.CorrelationId,
    Payload = nimbusMessage.Payload,
    Properties = nimbusMessage.Properties,
    // ... other fields
});

Transport Configuration

Each transport has its own configuration class:

Azure Service Bus

var transport = new AzureServiceBusTransportConfiguration()
    .WithConnectionString(connectionString)
    .WithDefaultTimeout(TimeSpan.FromSeconds(30))
    .WithMaxConcurrentCalls(10);

Redis

var transport = new RedisTransportConfiguration()
    .WithConnectionString("localhost:6379")
    .WithDatabase(0)
    .WithPollingInterval(TimeSpan.FromMilliseconds(100));

AMQP (ActiveMQ Artemis)

var transport = new AMQPTransportConfiguration()
    .WithBrokerUri("amqp://localhost:5672")
    .WithUsername("admin")
    .WithPassword("admin");

In-Process

var transport = new InProcessTransportConfiguration()
    .WithMaximumQueueLength(1000);

Transport Features Matrix

Different transports support different features:

FeatureAzure Service BusRedisAMQPIn-Process
Queues
Topics
Scheduled Delivery⚠️ Limited⚠️ Limited
Message TTL
Dead Letter Queue⚠️ Manual⚠️ Manual
Transactions⚠️ Limited
Large Messages⚠️ 256KB⚠️ 512MB⚠️ Limited✅ Unlimited
Persistence

Feature Compatibility: While Nimbus provides a unified API, some features may work differently or not be available on all transports. Always test your application against the target transport.

How Abstraction Works in Practice

1. Application Code Stays the Same

// This code works with ANY transport
public class OrderService
{
    private readonly IBus _bus;

    public async Task PlaceOrder(Order order)
    {
        await _bus.Send(new PlaceOrderCommand
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId
        });
    }
}

2. Only Configuration Changes

Development (In-Process):

var bus = new BusBuilder()
    .Configure()
    .WithTransport(new InProcessTransportConfiguration())
    .WithNames("OrderService", Environment.MachineName)
    .Build();

Staging (Redis):

var bus = new BusBuilder()
    .Configure()
    .WithTransport(new RedisTransportConfiguration()
        .WithConnectionString("redis-staging.example.com"))
    .WithNames("OrderService", Environment.MachineName)
    .Build();

Production (Azure Service Bus):

var bus = new BusBuilder()
    .Configure()
    .WithTransport(new AzureServiceBusTransportConfiguration()
        .WithConnectionString(azureConnectionString))
    .WithNames("OrderService", Environment.MachineName)
    .Build();

Benefits of Transport Abstraction

1. Easy Testing

Use in-process transport for unit/integration tests:

[Test]
public async Task OrderService_Should_SendCommand()
{
    // Arrange
    var transport = new InProcessTransportConfiguration();
    var bus = new BusBuilder()
        .Configure()
        .WithTransport(transport)
        .Build();

    var orderService = new OrderService(bus);

    // Act
    await orderService.PlaceOrder(new Order { Id = "123" });

    // Assert
    var messages = transport.GetSentMessages();
    Assert.That(messages, Has.Count.EqualTo(1));
}

2. Gradual Migration

Migrate from one transport to another service-by-service:

[Old System - AMQP/Artemis]

[Bridge Service]

[New System - Azure Service Bus]

3. Multi-Transport Scenarios

Use different transports for different purposes:

// Critical messages → Azure Service Bus (guaranteed delivery)
var criticalBus = new BusBuilder()
    .Configure()
    .WithTransport(azureServiceBusTransport)
    .Build();

// High-volume events → Redis (fast, less overhead)
var eventBus = new BusBuilder()
    .Configure()
    .WithTransport(redisTransport)
    .Build();

4. Vendor Independence

Avoid vendor lock-in. If your messaging provider:

  • Increases prices significantly
  • Deprecates features you depend on
  • Has reliability issues
  • Doesn’t meet performance requirements

You can switch transports without rewriting your application.

Transport Implementation Details

Queue vs. Topic Abstraction

Queues (Point-to-Point):

  • One message → One consumer
  • Load balancing across consumers
  • Used for commands and requests

Topics (Pub/Sub):

  • One message → Many subscribers
  • Each subscription gets a copy
  • Used for events
// Nimbus automatically chooses queue vs. topic based on message type
await bus.Send(command);     // → Queue
await bus.Publish(@event);   // → Topic
await bus.Request(request);  // → Queue (with reply-to)

Connection Management

Transports handle connection pooling and resilience:

public class AzureServiceBusTransport : INimbusTransport
{
    private readonly ConcurrentDictionary<string, QueueClient> _queueClients;
    private readonly RetryPolicy _retryPolicy;

    public INimbusMessageSender GetQueueSender(string queuePath)
    {
        var client = _queueClients.GetOrAdd(queuePath, path =>
        {
            var client = new QueueClient(connectionString, path);
            client.RetryPolicy = _retryPolicy;
            return client;
        });

        return new AzureServiceBusQueueSender(client);
    }
}

Custom Transport Implementation

You can implement your own transport for specialized scenarios:

public class CustomTransport : INimbusTransport
{
    public INimbusMessageSender GetQueueSender(string queuePath)
    {
        return new CustomQueueSender(queuePath);
    }

    public INimbusMessageReceiver GetQueueReceiver(string queuePath)
    {
        return new CustomQueueReceiver(queuePath);
    }

    // Implement topic methods...
}

Then use it in your configuration:

var bus = new BusBuilder()
    .Configure()
    .WithTransport(new CustomTransport())
    .Build();

Creating Custom Transports: Useful for integrating with proprietary messaging systems, testing scenarios, or specialized requirements like file-system based messaging.

Best Practices

1. Design for the Lowest Common Denominator

Not all transports support all features. Design your messages to work with basic capabilities:

// ✅ GOOD: Works everywhere
public class SimpleCommand : IBusCommand
{
    public string Id { get; set; }
    public string Data { get; set; }
}

// ⚠️ CAREFUL: Requires large message support
public class LargeCommand : IBusCommand
{
    public byte[] BigData { get; set; } // 10MB file
}

2. Abstract Transport Configuration

Don’t hardcode transport configuration. Use dependency injection and configuration files:

public class MessagingConfig
{
    public string TransportType { get; set; }
    public string ConnectionString { get; set; }
}

public static INimbusTransport CreateTransport(MessagingConfig config)
{
    return config.TransportType switch
    {
        "AzureServiceBus" => new AzureServiceBusTransportConfiguration()
            .WithConnectionString(config.ConnectionString),
        "Redis" => new RedisTransportConfiguration()
            .WithConnectionString(config.ConnectionString),
        _ => throw new NotSupportedException()
    };
}

3. Test Against Target Transport

Always run integration tests against your production transport:

[Test]
[Category("Integration")]
public async Task EndToEnd_With_AzureServiceBus()
{
    var transport = new AzureServiceBusTransportConfiguration()
        .WithConnectionString(testConnectionString);

    // Test with actual Azure Service Bus
}

Next Steps