skills/microservice/processor/hosted-service/SKILL.md
Use when implementing IHostedService with ServiceBusSessionProcessor and DLQ handling.
npx skillsauth add faysilalshareef/dotnet-ai-kit hosted-serviceInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
IHostedService directly (not BackgroundService)ServiceBusSessionProcessor AND a paired ServiceBusProcessor for the dead letter sub-queueAutoCompleteMessages = false -- always complete/abandon explicitlyServiceBusClientStartAsync starts both processors; StopAsync closes bothProcessErrorAsync callback with LogCriticalnamespace {Company}.{Domain}.Processor.Infra.ServiceBus.Listeners;
public class OrderListener : IHostedService
{
private readonly ILogger<OrderListener> _logger;
private readonly IServiceProvider _provider;
private readonly ServiceBusSessionProcessor _processor;
private readonly ServiceBusProcessor _deadLetterProcessor;
public OrderListener(
IServiceProvider serviceProvider,
ILogger<OrderListener> logger,
IOptions<ServiceBusOptions> options,
OrderServiceBus orderServiceBus)
{
var serviceBusOptions = options.Value;
_logger = logger;
_provider = serviceProvider;
// Main session processor for ordered event processing
_processor = orderServiceBus.Client.CreateSessionProcessor(
serviceBusOptions.OrderCommandTopic,
serviceBusOptions.Subscription,
new ServiceBusSessionProcessorOptions()
{
PrefetchCount = 1,
MaxConcurrentCallsPerSession = 1,
MaxConcurrentSessions = 1000,
AutoCompleteMessages = false,
SessionIdleTimeout = TimeSpan.FromMinutes(1),
MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(5),
});
_processor.ProcessMessageAsync += Processor_ProcessMessageAsync;
_processor.ProcessErrorAsync += Processor_ProcessErrorAsync;
// Paired DLQ processor for reprocessing dead-lettered messages
_deadLetterProcessor = orderServiceBus.Client.CreateProcessor(
serviceBusOptions.OrderCommandTopic,
serviceBusOptions.Subscription,
new ServiceBusProcessorOptions()
{
PrefetchCount = 1,
AutoCompleteMessages = false,
MaxConcurrentCalls = 1,
SubQueue = SubQueue.DeadLetter,
});
_deadLetterProcessor.ProcessMessageAsync += DeadLetterProcessor_ProcessMessageAsync;
_deadLetterProcessor.ProcessErrorAsync += DeadLetterProcessor_ProcessErrorAsync;
}
private async Task Processor_ProcessMessageAsync(ProcessSessionMessageEventArgs arg)
{
Task<bool> isHandledTask = HandleSubject(arg.Message.Subject, arg.Message);
var isHandled = await isHandledTask;
if (isHandled)
await arg.CompleteMessageAsync(arg.Message);
else
await arg.AbandonMessageAsync(arg.Message);
}
private Task Processor_ProcessErrorAsync(ProcessErrorEventArgs arg)
{
_logger.LogCritical(arg.Exception,
"OrderListener => Processor_ProcessErrorAsync " +
"Message handler encountered an exception," +
" Error Source:{ErrorSource}," +
" Entity Path:{EntityPath}",
arg.ErrorSource.ToString(),
arg.EntityPath);
return Task.CompletedTask;
}
private async Task DeadLetterProcessor_ProcessMessageAsync(ProcessMessageEventArgs arg)
{
// DLQ handler uses the SAME routing logic as the main processor
Task<bool> isHandledTask = HandleSubject(arg.Message.Subject, arg.Message);
var isHandled = await isHandledTask;
if (isHandled)
await arg.CompleteMessageAsync(arg.Message);
else
await arg.AbandonMessageAsync(arg.Message);
}
private Task DeadLetterProcessor_ProcessErrorAsync(ProcessErrorEventArgs arg)
{
_logger.LogCritical(arg.Exception,
"DeadLetter Message handler encountered an exception," +
" Error Source:{ErrorSource}," +
" Entity Path:{EntityPath}",
arg.ErrorSource.ToString(),
arg.EntityPath);
return Task.CompletedTask;
}
// Subject routing and HandleAsync defined in event-routing skill
private Task<bool> HandleSubject(string subject, ServiceBusReceivedMessage message)
{
// See event-routing SKILL for full pattern
return subject switch
{
EventType.OrderCreated => HandleAsync<OrderCreatedData>(message),
EventType.OrderUpdated => HandleAsync<OrderUpdatedData>(message),
_ => Task.FromResult(true) // Complete unknown events
};
}
private async Task<bool> HandleAsync<T>(ServiceBusReceivedMessage message) { /* see event-routing */ return true; }
public Task StartAsync(CancellationToken cancellationToken)
{
_processor.StartProcessingAsync(cancellationToken);
_deadLetterProcessor.StartProcessingAsync(cancellationToken);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_processor.CloseAsync(cancellationToken);
_deadLetterProcessor.CloseAsync(cancellationToken);
return Task.CompletedTask;
}
}
namespace {Company}.{Domain}.Processor.Infra.ServiceBus.Clients;
// Each external Service Bus namespace gets a typed wrapper
public class OrderServiceBus(string connectionString)
{
public ServiceBusClient Client { get; } = new ServiceBusClient(connectionString);
}
namespace {Company}.{Domain}.Processor.Infra.ServiceBus;
public class ServiceBusOptions
{
public const string Options = "ServiceBus";
[Required]
public required string OrderCommandTopic { get; init; }
[Required]
public required string Subscription { get; init; }
public int BatchSize { get; init; } = 50;
public int MaxConcurrentSessions { get; init; } = 100;
}
// In ServiceBusRegistrationExtensions.cs
services.AddSingleton(provider =>
{
var options = provider.GetRequiredService<IOptions<ConnectionStringsOption>>();
return new OrderServiceBus(options.Value.OrderServiceBus);
});
services.AddHostedService<OrderListener>();
services.AddHostedService<ProductListener>();
| Option | Value | Purpose | |------------------------------|--------|-----------------------------------------------| | PrefetchCount | 1 | Conservative prefetch for reliability | | MaxConcurrentCallsPerSession | 1 | Strict ordering within each session | | MaxConcurrentSessions | 1000 | High parallelism across different aggregates | | AutoCompleteMessages | false | Explicit complete/abandon for error control | | SessionIdleTimeout | 1 min | Release session lock after idle period | | MaxAutoLockRenewalDuration | 5 min | Renew lock for long-running handlers |
| Option | Value | Purpose | |---------------------|--------------------|------------------------------------| | PrefetchCount | 1 | Process one DLQ message at a time | | AutoCompleteMessages| false | Explicit complete/abandon | | MaxConcurrentCalls | 1 | Serial DLQ processing | | SubQueue | SubQueue.DeadLetter| Target the dead letter sub-queue |
| Anti-Pattern | Correct Approach | |---|---| | Creating processor in StartAsync | Create in constructor, start in StartAsync | | Missing DLQ processor | Always pair main processor with DLQ processor | | AutoCompleteMessages = true | Set false for explicit control | | Using StopProcessingAsync | Use CloseAsync in StopAsync | | Missing error handler | Always wire ProcessErrorAsync with LogCritical |
# Find IHostedService implementations
grep -r ": IHostedService" --include="*.cs" src/
# Find ServiceBusSessionProcessor usage
grep -r "ServiceBusSessionProcessor" --include="*.cs" src/
# Find SubQueue.DeadLetter usage
grep -r "SubQueue.DeadLetter" --include="*.cs" src/
# Find AddHostedService registrations
grep -r "AddHostedService" --include="*.cs" src/
Infra/ServiceBus/Clients/ConnectionStringsOptionServiceBusOptionsIHostedService with paired DLQ processorAddHostedService<T>() in DI extensiondata-ai
Use when about to claim work is complete, fixed, passing, or ready — before committing, creating PRs, or moving to the next task. Requires running verification commands and confirming output before making any success claims.
development
Use when encountering any bug, test failure, build error, or unexpected behavior — before proposing fixes or making changes.
development
Use when checkpointing, wrapping up, or handing off an AI-assisted development session.
development
Use when following the Specification-Driven Development lifecycle from plan through ship.