# Nimbus Nimbus is a transport-agnostic .NET 9 messaging library. It provides a uniform API for commands, events, and request/response messaging over pluggable transports (Azure Service Bus, Redis, AMQP/ActiveMQ Artemis, SQL Server, PostgreSQL, In-Process). Full documentation: https://nimbusapi.com/docs ## Message Patterns Nimbus supports four patterns: | Pattern | Contract interface | Handler interface | IBus method | |---|---|---|---| | Command | `IBusCommand` | `IHandleCommand` | `bus.Send(cmd)` | | Competing Event | `IBusEvent` | `IHandleCompetingEvent` | `bus.Publish(evt)` | | Multicast Event | `IMulticastEvent` | `IHandleMulticastEvent` | `bus.Publish(evt)` | | Request/Response | `IBusRequest` | `IHandleRequest` | `bus.Request(req)` | - **Command** — fire-and-forget, one handler, queue-based, competing consumers for horizontal scaling - **Competing Event** — one handler instance across all consumers per subscriber group; use for side effects (send email, update a record) - **Multicast Event** — every running instance receives every event; use for cache invalidation, broadcast - **Request** — caller awaits a typed response from exactly one handler ## NuGet Packages ``` dotnet add package Nimbus dotnet add package Nimbus.Transports.AzureServiceBus dotnet add package Nimbus.Transports.Redis dotnet add package Nimbus.Transports.AMQP dotnet add package Nimbus.Transports.SqlServer dotnet add package Nimbus.Transports.Postgres dotnet add package Nimbus.Containers.Autofac dotnet add package Nimbus.Serializers.Json ``` ## Bus Setup ```csharp using Autofac; using Nimbus; using Nimbus.Configuration; using Nimbus.Containers.Autofac; using Nimbus.Transports.Redis; // 1. Register handlers in DI container var builder = new ContainerBuilder(); builder.RegisterAssemblyTypes(typeof(MyHandler).Assembly).AsImplementedInterfaces(); var container = builder.Build(); // 2. Type provider — scans assemblies for message types and handlers var typeProvider = new AssemblyScanningTypeProvider( typeof(MyCommand).Assembly, typeof(MyHandler).Assembly ); // 3. Build and start the bus (singleton — one per process) var bus = new BusBuilder() .Configure() .WithNames("MyService", Environment.MachineName) .WithTransport(new RedisTransportConfiguration() .WithConnectionString("localhost:6379")) .WithTypesFrom(typeProvider) .WithAutofacDefaults(container.BeginLifetimeScope()) .Build(); await bus.Start(); // ... await bus.Stop(); ``` ## Commands ```csharp // Contract (shared project) public class PlaceOrderCommand : IBusCommand { public string OrderId { get; set; } public decimal Amount { get; set; } } // Handler (resolved from DI, constructor injection works) public class PlaceOrderHandler : IHandleCommand { private readonly IOrderRepository _repo; public PlaceOrderHandler(IOrderRepository repo) => _repo = repo; public async Task Handle(PlaceOrderCommand command) { await _repo.Save(new Order { Id = command.OrderId, Amount = command.Amount }); } } // Sending await bus.Send(new PlaceOrderCommand { OrderId = "ORD-1", Amount = 99.99m }); // Delayed (transport support varies — native on Artemis, polling-based on SQL/Postgres) await bus.SendAfter(new PlaceOrderCommand { ... }, TimeSpan.FromMinutes(5)); await bus.SendAt(new PlaceOrderCommand { ... }, DateTimeOffset.UtcNow.AddHours(1)); ``` ## Events ```csharp // Competing event — one handler instance processes each event public class OrderPlacedEvent : IBusEvent { public string OrderId { get; set; } } public class SendConfirmationHandler : IHandleCompetingEvent { public async Task Handle(OrderPlacedEvent @event) { /* runs once per event */ } } // Multicast event — every instance receives every event public class PriceCatalogUpdatedEvent : IMulticastEvent { } public class ClearPriceCacheHandler : IHandleMulticastEvent { public async Task Handle(PriceCatalogUpdatedEvent @event) { /* runs on every instance */ } } // Publishing (same call for both types) await bus.Publish(new OrderPlacedEvent { OrderId = "ORD-1" }); ``` Multiple independent handlers can subscribe to the same event — each gets its own copy and runs independently. ## Request / Response ```csharp // Request and response types public class GetOrderRequest : IBusRequest { public string OrderId { get; set; } } public class OrderDto { public string OrderId { get; set; } public string Status { get; set; } } // Handler public class GetOrderHandler : IHandleRequest { public async Task Handle(GetOrderRequest request) { var order = await _repo.GetById(request.OrderId); return order == null ? null : new OrderDto { OrderId = order.Id, Status = order.Status }; } } // Sending — caller blocks until response or timeout var result = await bus.Request( new GetOrderRequest { OrderId = "ORD-1" }, timeout: TimeSpan.FromSeconds(10) // optional; throws TimeoutException if exceeded ); ``` ## Transport Configuration ### Azure Service Bus ```csharp // Connection string new AzureServiceBusTransportConfiguration() .WithConnectionString("Endpoint=sb://your-namespace.servicebus.windows.net/;...") // Managed identity (recommended for production) new AzureServiceBusTransportConfiguration() .WithNamespaceEndpoint("your-namespace.servicebus.windows.net") .WithManagedIdentity() // Large messages (>256 KB standard / >1 MB premium) — offload body to blob storage .WithLargeMessageBodyStore(new AzureBlobStorageLargeMessageBodyStore( storageConnectionString, containerName: "nimbus-large-messages")) ``` ### Redis ```csharp new RedisTransportConfiguration() .WithConnectionString("localhost:6379") // with password: .WithConnectionString("localhost:6379,password=secret") // SSL (e.g. Azure Cache for Redis): .WithConnectionString("your-cache.redis.cache.windows.net:6380,password=...,ssl=true,abortConnect=false") ``` Note: Redis Pub/Sub (used for events) does not persist messages. Events published with no active subscribers are lost. ### AMQP — ActiveMQ Artemis Tested against ActiveMQ Artemis. Supports Artemis-native scheduled delivery via `_AMQ_SCHED_DELIVERY` (requires Artemis 2.x+). ```csharp new AMQPTransportConfiguration() .WithBrokerUri("amqp://localhost:5672") .WithCredentials("admin", "admin") // Failover for production: new AMQPTransportConfiguration() .WithFailover("amqp://broker1:5672", "amqp://broker2:5672") .WithCredentials("admin", "admin") ``` Run Artemis locally: ``` docker run -d --name artemis -p 5672:5672 -p 8161:8161 \ -e ARTEMIS_USERNAME=admin -e ARTEMIS_PASSWORD=admin \ apache/activemq-artemis:latest ``` ### SQL Server ```csharp new SqlServerTransportConfiguration() .WithConnectionString("Server=localhost;Database=Nimbus;Integrated Security=true;") .WithAutoCreateSchema() // creates tables on startup (idempotent) .WithPollInterval(TimeSpan.FromMilliseconds(250)) // default 1 s ``` Creates three tables: `NimbusMessages`, `NimbusSubscriptions`, `NimbusDeadLetters`. ### PostgreSQL ```csharp new PostgresTransportConfiguration() .WithConnectionString("Host=localhost;Database=nimbus;Username=postgres;Password=secret") .WithAutoCreateSchema() .WithPollInterval(TimeSpan.FromMilliseconds(250)) ``` Creates three tables: `nimbus_messages`, `nimbus_subscriptions`, `nimbus_dead_letters`. ### In-Process (testing only) ```csharp new InProcessTransportConfiguration() ``` No external dependencies. All messages in memory. Not for production. ## Dependency Injection Handlers are resolved from your DI container. Register with `AsImplementedInterfaces()`: ```csharp // Autofac — scan assembly builder.RegisterAssemblyTypes(typeof(MyHandler).Assembly).AsImplementedInterfaces(); // Or individually builder.RegisterType().AsImplementedInterfaces(); ``` `IBus` can be injected into handlers to send/publish follow-up messages. ## Key Namespaces | Namespace | Contents | |---|---| | `Nimbus.MessageContracts` | `IBusCommand`, `IBusEvent`, `IMulticastEvent`, `IBusRequest<,>` | | `Nimbus.Handlers` | `IHandleCommand<>`, `IHandleCompetingEvent<>`, `IHandleMulticastEvent<>`, `IHandleRequest<,>` | | `Nimbus.Configuration` | `BusBuilder`, `AssemblyScanningTypeProvider` | | `Nimbus.Transports.Redis` | `RedisTransportConfiguration` | | `Nimbus.Transports.AzureServiceBus` | `AzureServiceBusTransportConfiguration` | | `Nimbus.Transports.AMQP` | `AMQPTransportConfiguration` | | `Nimbus.Transports.SqlServer` | `SqlServerTransportConfiguration` | | `Nimbus.Transports.Postgres` | `PostgresTransportConfiguration` | | `Nimbus.Containers.Autofac` | `WithAutofacDefaults()` extension | ## Handler Design - Handlers should be **idempotent** — the same message may be delivered more than once due to transport retries - Throw exceptions to signal failure; the transport will redeliver and eventually dead-letter after exhausting retries - Keep message contracts in a shared project referenced by both producers and consumers - The bus is a singleton — create one instance per process and inject it via DI