Gargalo em Banco de Dados: Mensageria e Paginação
Introdução#
Todo desenvolvedor .NET que trabalha com aplicações de médio ou grande porte já esbarrou em pelo menos um destes cenários: uma rotina noturna que precisa gravar 50 mil pedidos no banco, uma consulta de relatório que traz 300 mil registros de uma única vez e trava a aplicação, ou uma API que demora 8 segundos para responder porque o EF Core está executando 10 mil INSERT individuais em sequência.
O SQL Server e o Oracle são bancos robustos e maduros, mas nenhum deles foi projetado para receber milhares de gravações em rajada, nem para retornar centenas de milhares de linhas de uma só vez sem custo. O problema, na maioria dos casos, não é o banco — é a forma como a aplicação C# interage com ele.
Neste artigo vamos explorar as duas faces do gargalo: escrita em massa e leitura de grandes volumes. Para cada uma, você vai ver a causa raiz, as particularidades de SQL Server e Oracle com EF Core 8.0+, e a solução prática — mensageria para gravação e paginação eficiente para leitura. Todo o código é produção-ready e compatível com .NET 8/9.
Pré-requisitos: Conhecimento básico de C# e EF Core. Recomenda-se ter lido o artigo sobre programação assíncrona com C# antes de continuar.
📦 Código-fonte: A implementação completa deste artigo está no repositório blog-zocateli-sample no GitHub. Clone, explore e adapte ao seu contexto.
O Gargalo de Escrita: Por Que Gravar Milhares de Registros Dói#
O Custo Real de Cada INSERT#
Quando você salva uma lista de entidades com o EF Core da forma mais comum, algo assim acontece:
// ❌ Abordagem ingênua — InsertOne por vez
foreach (var pedido in listaDe50MilPedidos)
{
await context.Pedidos.AddAsync(pedido);
await context.SaveChangesAsync(); // PROBLEMA: 1 roundtrip por registro!
}
Cada SaveChangesAsync() dentro do loop representa:
- Uma transação aberta → commit → fechada no banco
- Um roundtrip de rede (latência de 1–5ms por chamada)
- Log do banco de dados sendo escrito para cada operação
- Locks de linha sendo alocados e liberados 50 mil vezes
Para 50.000 pedidos com 2ms de latência por roundtrip, isso representa 100 segundos de processamento puro de I/O. E isso é no melhor cenário, sem contenção.
Particularidades: SQL Server vs Oracle#
Ainda que a solução seja similar nos dois bancos, há diferenças importantes:
| Aspecto | SQL Server | Oracle |
|---|---|---|
| Bulk Insert nativo | BULK INSERT / SqlBulkCopy | ODP.NET BulkCopy / INSERT ALL |
| Tamanho máximo de lote padrão | 1.000 linhas por INSERT | Depende do ArrayBindCount |
| Sequências / Identity | IDENTITY ou SEQUENCE | Apenas SEQUENCE (obrigatório) |
| Rollback de bulk | Pode ser minimamente logado | Sempre logado (redo log) |
| EF Core provider | Microsoft.EntityFrameworkCore.SqlServer | Oracle.EntityFrameworkCore |
No Oracle, um detalhe crítico é que o provider oficial (Oracle.EntityFrameworkCore 8.x) exige que você configure a geração de chaves via SEQUENCE + TRIGGER (ou GENERATED ALWAYS AS IDENTITY no Oracle 12c+). Ignorar isso em gravações em massa vai gerar N chamadas extras ao banco só para obter os IDs.
// Oracle: configuração correta no OnModelCreating
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<Pedido>(entity =>
{
entity.HasKey(e => e.Id);
// Oracle 12c+: identity nativo, evita roundtrip extra por ID
entity.Property(e => e.Id)
.UseIdentityColumn();
// Alternativa para Oracle 11g/legado
// entity.Property(e => e.Id)
// .HasDefaultValueSql("SEQ_PEDIDOS.NEXTVAL");
});
}
A Solução Imediata: SaveChanges em Lote com EF Core 8#
Antes de introduzir filas, a primeira otimização é mover o SaveChangesAsync() para fora do loop e usar o chunk para não sobrecarregar o contexto:
// ✅ Melhor: batch por chunk + 1 SaveChanges por lote
public async Task GravarPedidosEmLoteAsync(
IEnumerable<Pedido> pedidos,
CancellationToken ct = default)
{
const int tamanhoLote = 500;
foreach (var chunk in pedidos.Chunk(tamanhoLote))
{
await context.Pedidos.AddRangeAsync(chunk, ct);
await context.SaveChangesAsync(ct);
// Limpar o ChangeTracker para não acumular entidades em memória
context.ChangeTracker.Clear();
}
}
💡 Dica: O método Chunk() foi introduzido no .NET 6. Combinado com ChangeTracker.Clear(), evita que o contexto EF Core cresça indefinidamente ao rastrear dezenas de milhares de entidades.
ExecuteInsertAsync e BulkInsert no EF Core 8#
O EF Core 7 trouxe ExecuteUpdateAsync e ExecuteDeleteAsync. O EF Core 8 melhorou o suporte a operações em massa. Para cenários de altíssima performance, use a biblioteca EFCore.BulkExtensions:
// ✅ BulkInsert para SQL Server e Oracle (via EFCore.BulkExtensions)
// dotnet add package EFCore.BulkExtensions
public async Task BulkInsertPedidosAsync(
List<Pedido> pedidos,
CancellationToken ct = default)
{
var bulkConfig = new BulkConfig
{
BatchSize = 1000,
UseTempDB = true, // SQL Server: tabela temporária para staging
SetOutputIdentity = true, // Preenche os IDs gerados pelo banco
PreserveInsertOrder = true
};
await context.BulkInsertAsync(pedidos, bulkConfig, cancellationToken: ct);
}
Com BulkInsert, 50.000 registros que levavam 100 segundos com SaveChanges individual passam a ser gravados em 2–4 segundos no SQL Server, e em 3–6 segundos no Oracle.
Mensageria: A Solução Arquitetural para Gravação em Massa#
Otimizar o próprio INSERT resolve o sintoma, mas não a causa raiz. O problema real é que a aplicação está tentando processar um volume enorme de forma síncrona, bloqueando a thread e a requisição enquanto grava. A solução arquitetural correta é desacoplar a recepção da gravação usando uma fila de mensagens.
Como a Mensageria Resolve o Problema#
Em vez de gravar diretamente no banco ao receber os dados, a aplicação:
- Publica os registros em uma fila (RabbitMQ, Azure Service Bus, etc.) — operação rápida (~1ms por mensagem)
- Retorna imediatamente para o cliente com
202 Accepted - Um Consumer separado lê a fila em lotes e grava no banco com bulk insert
Essa separação traz benefícios além da performance:
- Resiliência: se o banco cair temporariamente, as mensagens ficam na fila. Naão há perda de dados.
- Controle de fluxo: o consumer pode processar na velocidade que o banco suporta
- Backpressure natural: a fila amortece picos de carga
- Observabilidade: você pode monitorar o tamanho da fila e saber exatamente o backlog pendente
Implementação com RabbitMQ.Client#
A biblioteca oficial RabbitMQ.Client é 100% open-source (Apache 2.0) e fornece acesso direto ao protocolo AMQP. Combinada com o BackgroundService do .NET, implementamos um consumer que acumula mensagens em lote e grava tudo de uma vez com BulkInsert.
// dotnet add package RabbitMQ.Client
// dotnet add package EFCore.BulkExtensions
// --- Mensagem ---
public record PedidoCriadoMessage(
Guid Id,
string ClienteId,
decimal Valor,
DateTime DataCriacao);
// --- Producer ---
public class PedidoProducer(IConnection connection)
{
private const string QueueName = "pedidos-queue";
public async Task PublicarAsync(
PedidoCriadoMessage mensagem,
CancellationToken ct = default)
{
await using var channel = await connection.CreateChannelAsync(cancellationToken: ct);
await channel.QueueDeclareAsync(
queue: QueueName,
durable: true,
exclusive: false,
autoDelete: false,
cancellationToken: ct);
var json = JsonSerializer.SerializeToUtf8Bytes(mensagem);
var props = new BasicProperties { DeliveryMode = DeliveryModes.Persistent };
await channel.BasicPublishAsync(
exchange: string.Empty,
routingKey: QueueName,
mandatory: false,
basicProperties: props,
body: json,
cancellationToken: ct);
}
}
// --- Consumer em lote (BackgroundService) ---
public class PedidoConsumerWorker(
IConnection connection,
IServiceScopeFactory scopeFactory,
ILogger<PedidoConsumerWorker> logger) : BackgroundService
{
private const string QueueName = "pedidos-queue";
private const int TamanhoLote = 500;
protected override async Task ExecuteAsync(CancellationToken ct)
{
await using var channel = await connection.CreateChannelAsync(cancellationToken: ct);
await channel.QueueDeclareAsync(
queue: QueueName, durable: true,
exclusive: false, autoDelete: false,
cancellationToken: ct);
// Backpressure: limita mensagens em voo para não sobrecarregar a memória
await channel.BasicQosAsync(
prefetchSize: 0,
prefetchCount: TamanhoLote + 100,
global: false,
cancellationToken: ct);
var lote = new List<(ulong Tag, PedidoCriadoMessage Msg)>();
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (_, ea) =>
{
var msg = JsonSerializer.Deserialize<PedidoCriadoMessage>(ea.Body.Span)!;
lote.Add((ea.DeliveryTag, msg));
if (lote.Count >= TamanhoLote)
await ProcessarLoteAsync(channel, lote, ct);
};
await channel.BasicConsumeAsync(
queue: QueueName,
autoAck: false,
consumer: consumer,
cancellationToken: ct);
// Timer de flush: força o processamento mesmo que o lote não esteja cheio
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(5));
while (await timer.WaitForNextTickAsync(ct))
{
if (lote.Count > 0)
await ProcessarLoteAsync(channel, lote, ct);
}
}
private async Task ProcessarLoteAsync(
IChannel channel,
List<(ulong Tag, PedidoCriadoMessage Msg)> lote,
CancellationToken ct)
{
var snapshot = lote.ToList();
lote.Clear();
var pedidos = snapshot.Select(x => new Pedido
{
Id = x.Msg.Id,
ClienteId = x.Msg.ClienteId,
Valor = x.Msg.Valor,
DataCriacao = x.Msg.DataCriacao
}).ToList();
using var scope = scopeFactory.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var bulkConfig = new BulkConfig { BatchSize = 500 };
await dbContext.BulkInsertAsync(pedidos, bulkConfig, cancellationToken: ct);
// ACK múltiplo: confirma todo o lote de uma só vez
await channel.BasicAckAsync(
deliveryTag: snapshot[^1].Tag,
multiple: true,
cancellationToken: ct);
logger.LogInformation("Lote de {Count} pedidos gravado", pedidos.Count);
}
}
Registrando no Program.cs#
// Program.cs
// dotnet add package RabbitMQ.Client
builder.Services.AddSingleton<IConnection>(_ =>
{
var factory = new ConnectionFactory
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
};
// CreateConnectionAsync retorna Task — resolvemos aqui para o DI container
return factory.CreateConnectionAsync().GetAwaiter().GetResult();
});
builder.Services.AddScoped<PedidoProducer>();
builder.Services.AddHostedService<PedidoConsumerWorker>();
⚠️ Atenção: No Oracle, o BulkInsert do EFCore.BulkExtensions requer o Oracle Data Provider for .NET (ODP.NET). Certifique-se de registrar o provider correto e de que as sequences foram configuradas corretamente no modelo, ou a inserção em massa vai falhar com erro de constraint de chave primária.
RabbitMQ vs Azure Service Bus: Qual Escolher?#
As duas opções resolvem o mesmo problema — desacoplar produção e consumo de mensagens — mas com modelos operacionais e econômicos bem distintos. A escolha depende do seu contexto: infraestrutura self-hosted vs cloud managed, custo variável vs fixo, e grau de controle desejado.
| Critério | RabbitMQ | Azure Service Bus |
|---|---|---|
| Tipo | Open-source (MPL 2.0), self-hosted | PaaS gerenciado pela Microsoft |
| Protocolo | AMQP 0-9-1 (nativo), AMQP 1.0, MQTT, STOMP | AMQP 1.0 |
| Hospedagem | Container próprio, VM, Kubernetes | Azure (sem infra para gerenciar) |
| Custo | Infraestrutura + operação (seu time) | Pay-per-use (~$0,10/milhão de ops) |
| Filas | Queues, Exchanges, Bindings | Queues e Topics/Subscriptions |
| Tamanho máximo de mensagem | 128 MB (padrão) | 256 KB (Standard) / 100 MB (Premium) |
| Retenção de mensagens | Até o disco encher / TTL configurável | 14 dias (máximo) |
| Dead-letter queue | ✅ Configurável | ✅ Nativo |
| Sessions (ordenação garantida) | ✅ Via x-single-active-consumer | ✅ Service Bus Sessions |
| Retry automático | Manual (Dead-letter + requeue) | Nativo (MaxDeliveryCount) |
| Escalabilidade horizontal | Manual (cluster Erlang) | Automática |
| Biblioteca .NET | RabbitMQ.Client (Apache 2.0) | Azure.Messaging.ServiceBus (MIT) |
| Melhor para | On-premise, multi-cloud, custo controlado | Ecossistema Azure, equipe pequena, SLA garantido |
Implementação Equivalente com Azure Service Bus#
Para migrar do RabbitMQ para o Azure Service Bus em ambiente Azure, o padrão de producer/consumer é similar, usando Azure.Messaging.ServiceBus:
// dotnet add package Azure.Messaging.ServiceBus
// --- Producer com envio em lote nativo ---
public class PedidoServiceBusProducer(ServiceBusSender sender)
{
public async Task PublicarLoteAsync(
IEnumerable<PedidoCriadoMessage> mensagens,
CancellationToken ct = default)
{
// O ServiceBusMessageBatch respeita o limite de tamanho automaticamente
using var batch = await sender.CreateMessageBatchAsync(ct);
foreach (var msg in mensagens)
{
var sbMsg = new ServiceBusMessage(
BinaryData.FromObjectAsJson(msg))
{
ContentType = "application/json",
MessageId = msg.Id.ToString()
};
if (!batch.TryAddMessage(sbMsg))
throw new InvalidOperationException(
$"Mensagem {msg.Id} excede o limite do lote");
}
await sender.SendMessagesAsync(batch, ct);
}
}
// --- Consumer com BackgroundService ---
public class PedidoServiceBusWorker(
ServiceBusProcessor processor,
IServiceScopeFactory scopeFactory,
ILogger<PedidoServiceBusWorker> logger) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken ct)
{
processor.ProcessMessageAsync += async args =>
{
var msg = args.Message.Body
.ToObjectFromJson<PedidoCriadoMessage>();
using var scope = scopeFactory.CreateScope();
var dbContext = scope.ServiceProvider
.GetRequiredService<AppDbContext>();
var pedido = new Pedido
{
Id = msg.Id,
ClienteId = msg.ClienteId,
Valor = msg.Valor,
DataCriacao = msg.DataCriacao
};
await dbContext.BulkInsertAsync(
new List<Pedido> { pedido },
cancellationToken: ct);
// Confirma o processamento — remove da fila
await args.CompleteMessageAsync(args.Message, ct);
};
processor.ProcessErrorAsync += args =>
{
logger.LogError(args.Exception,
"Erro ao processar mensagem do Service Bus");
return Task.CompletedTask;
};
await processor.StartProcessingAsync(ct);
await Task.Delay(Timeout.Infinite, ct);
}
public override async Task StopAsync(CancellationToken ct)
{
await processor.StopProcessingAsync(ct);
await base.StopAsync(ct);
}
}
// Program.cs
builder.Services.AddSingleton(provider =>
{
var client = new ServiceBusClient(
"Endpoint=sb://meu-namespace.servicebus.windows.net/;...");
return client.CreateSender("pedidos-queue");
});
builder.Services.AddSingleton(provider =>
{
var client = new ServiceBusClient(
"Endpoint=sb://meu-namespace.servicebus.windows.net/;...");
return client.CreateProcessor("pedidos-queue", new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 4, // Paralelismo no consumer
AutoCompleteMessages = false // Controle manual de ACK
});
});
builder.Services.AddHostedService<PedidoServiceBusWorker>();
💡 Dica: Em produção no Azure, prefira autenticação via Managed Identity em vez de connection string, usando new ServiceBusClient("meu-namespace.servicebus.windows.net", new DefaultAzureCredential()). Isso elimina segredos na configuração.
📖 Artigo completo com exemplos de código: Gargalo em Banco de Dados: Mensageria e Paginação