skills/microservice/processor/batch-processing/SKILL.md
Use when implementing batch event processing with BackgroundService and concurrency control.
npx skillsauth add faysilalshareef/dotnet-ai-kit batch-processingInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
BackgroundService (NOT IHostedService with ServiceBusSessionProcessor)SemaphoreSlim controls maximum concurrent session processingAcceptNextSessionAsync() manually accepts sessions one at a timeReceiveMessagesAsync(maxMessages: batchSize, maxWaitTime) for batch receiveGroupBy(SourceId).Select(g => g.First()) to keep first occurrenceIMediator.Send() for aggregated processingnamespace {Company}.{Domain}.Processor.Infra.ServiceBus.Listeners;
public class ItemQueueListener : BackgroundService
{
private readonly ILogger<ItemQueueListener> _logger;
private readonly IServiceProvider _provider;
private readonly ServiceBusClient _serviceBusClient;
private readonly string _queueName;
private readonly int _batchSize;
private readonly int _maxConcurrentSessions;
private readonly ServiceBusProcessor _deadLetterProcessor;
public ItemQueueListener(
IServiceProvider serviceProvider,
ILogger<ItemQueueListener> logger,
IOptions<ServiceBusOptions> options,
ItemQueueListenerServiceBus itemServiceBus)
{
var serviceBusOptions = options.Value;
_logger = logger;
_provider = serviceProvider;
_serviceBusClient = itemServiceBus.Client;
_queueName = serviceBusOptions.ItemQueue;
_batchSize = serviceBusOptions.ItemQueueBatchSize;
_maxConcurrentSessions = serviceBusOptions.ItemQueueMaxConcurrentSessions;
// Paired DLQ processor (same pattern as IHostedService listeners)
_deadLetterProcessor = _serviceBusClient.CreateProcessor(
_queueName,
new ServiceBusProcessorOptions()
{
PrefetchCount = 1,
AutoCompleteMessages = false,
MaxConcurrentCalls = 1,
SubQueue = SubQueue.DeadLetter,
});
_deadLetterProcessor.ProcessMessageAsync += DeadLetterProcessor_ProcessMessageAsync;
_deadLetterProcessor.ProcessErrorAsync += DeadLetterProcessor_ProcessErrorAsync;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _deadLetterProcessor.StartProcessingAsync(stoppingToken);
var semaphore = new SemaphoreSlim(_maxConcurrentSessions);
while (!stoppingToken.IsCancellationRequested)
{
await semaphore.WaitAsync(stoppingToken);
_ = Task.Run(async () =>
{
try
{
await AcceptAndProcessSessionAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Expected during shutdown
}
catch (Exception ex)
{
_logger.LogError(ex, "Error accepting session from queue");
}
finally
{
semaphore.Release();
}
}, stoppingToken);
}
}
// ... (see below for session processing)
public override async Task StopAsync(CancellationToken cancellationToken)
{
await _deadLetterProcessor.CloseAsync(cancellationToken);
await base.StopAsync(cancellationToken);
}
}
private async Task AcceptAndProcessSessionAsync(CancellationToken stoppingToken)
{
ServiceBusSessionReceiver? receiver = null;
try
{
receiver = await _serviceBusClient.AcceptNextSessionAsync(
_queueName,
new ServiceBusSessionReceiverOptions
{
PrefetchCount = _batchSize,
ReceiveMode = ServiceBusReceiveMode.PeekLock
},
stoppingToken);
await ProcessSessionAsync(receiver, stoppingToken);
}
catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.ServiceTimeout)
{
// No sessions available -- this is normal
}
finally
{
if (receiver != null)
await receiver.DisposeAsync();
}
}
private async Task ProcessSessionAsync(
ServiceBusSessionReceiver receiver, CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var messages = await receiver.ReceiveMessagesAsync(
maxMessages: _batchSize,
maxWaitTime: TimeSpan.FromSeconds(5),
cancellationToken: stoppingToken);
if (messages == null || messages.Count == 0)
break;
var sessionId = receiver.SessionId;
var sessionEnricher = new PropertyEnricher(name: "SessionId", sessionId);
using (LogContext.Push(sessionEnricher))
{
try
{
// Filter: skip non-matching subjects, complete them
var processableMessages = new List<ServiceBusReceivedMessage>();
var items = new List<ItemChangedItem>();
Guid? accountId = null;
foreach (var message in messages)
{
if (message.Subject != EventType.AccountItemChanged)
{
await receiver.CompleteMessageAsync(message, stoppingToken);
continue;
}
var json = Encoding.UTF8.GetString(message.Body);
var body = JsonConvert.DeserializeObject<ItemChanged>(json);
if (body == null || body.Quantity < 1)
{
await receiver.CompleteMessageAsync(message, stoppingToken);
continue;
}
accountId = body.AccountId;
items.Add(new ItemChangedItem
{
SourceId = body.SourceId,
SourceType = body.SourceType,
Quantity = body.Quantity
});
processableMessages.Add(message);
}
// Deduplicate by SourceId -- keep first occurrence
items = items
.GroupBy(i => i.SourceId)
.Select(g => g.First())
.ToList();
if (items.Count == 0 || accountId == null)
continue;
// Create batch request and send via MediatR
using var scope = _provider.CreateScope();
var mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
var batchRequest = new BatchItemChanged
{
AccountId = accountId.Value,
Items = items
};
var success = await mediator.Send(batchRequest, stoppingToken);
if (success)
{
foreach (var message in processableMessages)
await receiver.CompleteMessageAsync(message, stoppingToken);
}
else
{
foreach (var message in processableMessages)
await receiver.AbandonMessageAsync(message,
cancellationToken: stoppingToken);
}
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error processing batch for session {SessionId}", sessionId);
// Abandon all messages in the batch on error
foreach (var message in messages)
{
try
{
await receiver.AbandonMessageAsync(message,
cancellationToken: stoppingToken);
}
catch (Exception abandonEx)
{
_logger.LogError(abandonEx,
"Error abandoning message {MessageId}", message.MessageId);
}
}
}
}
}
}
namespace {Company}.{Domain}.Processor.Events.Outgoing;
public class BatchItemChanged : IRequest<bool>
{
public required Guid AccountId { get; init; }
public required List<ItemChangedItem> Items { get; init; }
}
public class ItemChangedItem
{
public required Guid SourceId { get; init; }
public required SourceType SourceType { get; init; }
public required int Quantity { get; init; }
}
private async Task DeadLetterProcessor_ProcessMessageAsync(ProcessMessageEventArgs arg)
{
if (arg.Message.Subject != EventType.AccountItemChanged)
{
await arg.CompleteMessageAsync(arg.Message);
return;
}
var json = Encoding.UTF8.GetString(arg.Message.Body);
var body = JsonConvert.DeserializeObject<ItemChanged>(json);
if (body == null || body.Quantity < 1)
{
await arg.CompleteMessageAsync(arg.Message);
return;
}
// DLQ processes messages individually (not as batch)
using var scope = _provider.CreateScope();
var mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
var success = await mediator.Send(new ItemChanged
{
AccountId = body.AccountId,
SourceId = body.SourceId,
SourceType = body.SourceType,
Quantity = body.Quantity
}, arg.CancellationToken);
if (success)
await arg.CompleteMessageAsync(arg.Message);
else
await arg.AbandonMessageAsync(arg.Message);
}
public class ServiceBusOptions
{
public const string Options = "ServiceBus";
[Required]
public required string ItemQueue { get; init; }
public int ItemQueueBatchSize { get; init; } = 50;
public int ItemQueueMaxConcurrentSessions { get; init; } = 100;
}
| Anti-Pattern | Correct Approach | |---|---| | Using ServiceBusSessionProcessor for batching | Use BackgroundService with AcceptNextSessionAsync | | Unbounded concurrency | Use SemaphoreSlim to limit concurrent sessions | | No deduplication | GroupBy SourceId, take first occurrence | | Processing DLQ messages as batch | DLQ handler processes messages individually | | Missing ServiceTimeout catch | Catch ServiceBusException with ServiceTimeout reason | | Forgetting to dispose receiver | Use try/finally with DisposeAsync |
# Find BackgroundService with batch processing
grep -r ": BackgroundService" --include="*.cs" src/
# Find AcceptNextSessionAsync usage
grep -r "AcceptNextSessionAsync" --include="*.cs" src/
# Find ReceiveMessagesAsync batch calls
grep -r "ReceiveMessagesAsync" --include="*.cs" src/
# Find SemaphoreSlim usage
grep -r "SemaphoreSlim" --include="*.cs" src/
data-ai
Use when about to claim work is complete, fixed, passing, or ready — before committing, creating PRs, or moving to the next task. Requires running verification commands and confirming output before making any success claims.
development
Use when encountering any bug, test failure, build error, or unexpected behavior — before proposing fixes or making changes.
development
Use when checkpointing, wrapping up, or handing off an AI-assisted development session.
development
Use when following the Specification-Driven Development lifecycle from plan through ship.