Executando verificação de segurança...
1

Gargalo em Banco de Dados: Mensageria e Paginação

Introdução#

Todo desenvolvedor .NET que trabalha com aplicações de médio ou grande porte já esbarrou em pelo menos um destes cenários: uma rotina noturna que precisa gravar 50 mil pedidos no banco, uma consulta de relatório que traz 300 mil registros de uma única vez e trava a aplicação, ou uma API que demora 8 segundos para responder porque o EF Core está executando 10 mil INSERT individuais em sequência.

O SQL Server e o Oracle são bancos robustos e maduros, mas nenhum deles foi projetado para receber milhares de gravações em rajada, nem para retornar centenas de milhares de linhas de uma só vez sem custo. O problema, na maioria dos casos, não é o banco — é a forma como a aplicação C# interage com ele.

Neste artigo vamos explorar as duas faces do gargalo: escrita em massa e leitura de grandes volumes. Para cada uma, você vai ver a causa raiz, as particularidades de SQL Server e Oracle com EF Core 8.0+, e a solução prática — mensageria para gravação e paginação eficiente para leitura. Todo o código é produção-ready e compatível com .NET 8/9.

Pré-requisitos: Conhecimento básico de C# e EF Core. Recomenda-se ter lido o artigo sobre programação assíncrona com C# antes de continuar.

📦 Código-fonte: A implementação completa deste artigo está no repositório blog-zocateli-sample no GitHub. Clone, explore e adapte ao seu contexto.

O Gargalo de Escrita: Por Que Gravar Milhares de Registros Dói#

O Custo Real de Cada INSERT#

Quando você salva uma lista de entidades com o EF Core da forma mais comum, algo assim acontece:

// ❌ Abordagem ingênua — InsertOne por vez
foreach (var pedido in listaDe50MilPedidos)
{
    await context.Pedidos.AddAsync(pedido);
    await context.SaveChangesAsync(); // PROBLEMA: 1 roundtrip por registro!
}

Cada SaveChangesAsync() dentro do loop representa:

  • Uma transação aberta → commit → fechada no banco
  • Um roundtrip de rede (latência de 1–5ms por chamada)
  • Log do banco de dados sendo escrito para cada operação
  • Locks de linha sendo alocados e liberados 50 mil vezes
    Para 50.000 pedidos com 2ms de latência por roundtrip, isso representa 100 segundos de processamento puro de I/O. E isso é no melhor cenário, sem contenção.

Particularidades: SQL Server vs Oracle#

Ainda que a solução seja similar nos dois bancos, há diferenças importantes:

AspectoSQL ServerOracle
Bulk Insert nativoBULK INSERT / SqlBulkCopyODP.NET BulkCopy / INSERT ALL
Tamanho máximo de lote padrão1.000 linhas por INSERTDepende do ArrayBindCount
Sequências / IdentityIDENTITY ou SEQUENCEApenas SEQUENCE (obrigatório)
Rollback de bulkPode ser minimamente logadoSempre logado (redo log)
EF Core providerMicrosoft.EntityFrameworkCore.SqlServerOracle.EntityFrameworkCore

No Oracle, um detalhe crítico é que o provider oficial (Oracle.EntityFrameworkCore 8.x) exige que você configure a geração de chaves via SEQUENCE + TRIGGER (ou GENERATED ALWAYS AS IDENTITY no Oracle 12c+). Ignorar isso em gravações em massa vai gerar N chamadas extras ao banco só para obter os IDs.

// Oracle: configuração correta no OnModelCreating
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
    modelBuilder.Entity<Pedido>(entity =>
    {
        entity.HasKey(e => e.Id);

        // Oracle 12c+: identity nativo, evita roundtrip extra por ID
        entity.Property(e => e.Id)
              .UseIdentityColumn();

        // Alternativa para Oracle 11g/legado
        // entity.Property(e => e.Id)
        //       .HasDefaultValueSql("SEQ_PEDIDOS.NEXTVAL");
    });
}

A Solução Imediata: SaveChanges em Lote com EF Core 8#

Antes de introduzir filas, a primeira otimização é mover o SaveChangesAsync() para fora do loop e usar o chunk para não sobrecarregar o contexto:

// ✅ Melhor: batch por chunk + 1 SaveChanges por lote
public async Task GravarPedidosEmLoteAsync(
    IEnumerable<Pedido> pedidos,
    CancellationToken ct = default)
{
    const int tamanhoLote = 500;

    foreach (var chunk in pedidos.Chunk(tamanhoLote))
    {
        await context.Pedidos.AddRangeAsync(chunk, ct);
        await context.SaveChangesAsync(ct);

        // Limpar o ChangeTracker para não acumular entidades em memória
        context.ChangeTracker.Clear();
    }
}

💡 Dica: O método Chunk() foi introduzido no .NET 6. Combinado com ChangeTracker.Clear(), evita que o contexto EF Core cresça indefinidamente ao rastrear dezenas de milhares de entidades.

ExecuteInsertAsync e BulkInsert no EF Core 8#

O EF Core 7 trouxe ExecuteUpdateAsync e ExecuteDeleteAsync. O EF Core 8 melhorou o suporte a operações em massa. Para cenários de altíssima performance, use a biblioteca EFCore.BulkExtensions:

// ✅ BulkInsert para SQL Server e Oracle (via EFCore.BulkExtensions)
// dotnet add package EFCore.BulkExtensions
public async Task BulkInsertPedidosAsync(
    List<Pedido> pedidos,
    CancellationToken ct = default)
{
    var bulkConfig = new BulkConfig
    {
        BatchSize = 1000,
        UseTempDB = true,              // SQL Server: tabela temporária para staging
        SetOutputIdentity = true,      // Preenche os IDs gerados pelo banco
        PreserveInsertOrder = true
    };

    await context.BulkInsertAsync(pedidos, bulkConfig, cancellationToken: ct);
}

Com BulkInsert, 50.000 registros que levavam 100 segundos com SaveChanges individual passam a ser gravados em 2–4 segundos no SQL Server, e em 3–6 segundos no Oracle.

Mensageria: A Solução Arquitetural para Gravação em Massa#

Otimizar o próprio INSERT resolve o sintoma, mas não a causa raiz. O problema real é que a aplicação está tentando processar um volume enorme de forma síncrona, bloqueando a thread e a requisição enquanto grava. A solução arquitetural correta é desacoplar a recepção da gravação usando uma fila de mensagens.

Como a Mensageria Resolve o Problema#

Em vez de gravar diretamente no banco ao receber os dados, a aplicação:

  • Publica os registros em uma fila (RabbitMQ, Azure Service Bus, etc.) — operação rápida (~1ms por mensagem)
  • Retorna imediatamente para o cliente com 202 Accepted
  • Um Consumer separado lê a fila em lotes e grava no banco com bulk insert

Essa separação traz benefícios além da performance:

  • Resiliência: se o banco cair temporariamente, as mensagens ficam na fila. Naão há perda de dados.
  • Controle de fluxo: o consumer pode processar na velocidade que o banco suporta
  • Backpressure natural: a fila amortece picos de carga
  • Observabilidade: você pode monitorar o tamanho da fila e saber exatamente o backlog pendente

Implementação com RabbitMQ.Client#

A biblioteca oficial RabbitMQ.Client é 100% open-source (Apache 2.0) e fornece acesso direto ao protocolo AMQP. Combinada com o BackgroundService do .NET, implementamos um consumer que acumula mensagens em lote e grava tudo de uma vez com BulkInsert.

// dotnet add package RabbitMQ.Client
// dotnet add package EFCore.BulkExtensions

// --- Mensagem ---
public record PedidoCriadoMessage(
    Guid Id,
    string ClienteId,
    decimal Valor,
    DateTime DataCriacao);

// --- Producer ---
public class PedidoProducer(IConnection connection)
{
    private const string QueueName = "pedidos-queue";

    public async Task PublicarAsync(
        PedidoCriadoMessage mensagem,
        CancellationToken ct = default)
    {
        await using var channel = await connection.CreateChannelAsync(cancellationToken: ct);

        await channel.QueueDeclareAsync(
            queue: QueueName,
            durable: true,
            exclusive: false,
            autoDelete: false,
            cancellationToken: ct);

        var json = JsonSerializer.SerializeToUtf8Bytes(mensagem);
        var props = new BasicProperties { DeliveryMode = DeliveryModes.Persistent };

        await channel.BasicPublishAsync(
            exchange: string.Empty,
            routingKey: QueueName,
            mandatory: false,
            basicProperties: props,
            body: json,
            cancellationToken: ct);
    }
}

// --- Consumer em lote (BackgroundService) ---
public class PedidoConsumerWorker(
    IConnection connection,
    IServiceScopeFactory scopeFactory,
    ILogger<PedidoConsumerWorker> logger) : BackgroundService
{
    private const string QueueName = "pedidos-queue";
    private const int TamanhoLote = 500;

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        await using var channel = await connection.CreateChannelAsync(cancellationToken: ct);

        await channel.QueueDeclareAsync(
            queue: QueueName, durable: true,
            exclusive: false, autoDelete: false,
            cancellationToken: ct);

        // Backpressure: limita mensagens em voo para não sobrecarregar a memória
        await channel.BasicQosAsync(
            prefetchSize: 0,
            prefetchCount: TamanhoLote + 100,
            global: false,
            cancellationToken: ct);

        var lote = new List<(ulong Tag, PedidoCriadoMessage Msg)>();

        var consumer = new AsyncEventingBasicConsumer(channel);
        consumer.ReceivedAsync += async (_, ea) =>
        {
            var msg = JsonSerializer.Deserialize<PedidoCriadoMessage>(ea.Body.Span)!;
            lote.Add((ea.DeliveryTag, msg));

            if (lote.Count >= TamanhoLote)
                await ProcessarLoteAsync(channel, lote, ct);
        };

        await channel.BasicConsumeAsync(
            queue: QueueName,
            autoAck: false,
            consumer: consumer,
            cancellationToken: ct);

        // Timer de flush: força o processamento mesmo que o lote não esteja cheio
        using var timer = new PeriodicTimer(TimeSpan.FromSeconds(5));
        while (await timer.WaitForNextTickAsync(ct))
        {
            if (lote.Count > 0)
                await ProcessarLoteAsync(channel, lote, ct);
        }
    }

    private async Task ProcessarLoteAsync(
        IChannel channel,
        List<(ulong Tag, PedidoCriadoMessage Msg)> lote,
        CancellationToken ct)
    {
        var snapshot = lote.ToList();
        lote.Clear();

        var pedidos = snapshot.Select(x => new Pedido
        {
            Id           = x.Msg.Id,
            ClienteId    = x.Msg.ClienteId,
            Valor        = x.Msg.Valor,
            DataCriacao  = x.Msg.DataCriacao
        }).ToList();

        using var scope    = scopeFactory.CreateScope();
        var dbContext       = scope.ServiceProvider.GetRequiredService<AppDbContext>();
        var bulkConfig      = new BulkConfig { BatchSize = 500 };

        await dbContext.BulkInsertAsync(pedidos, bulkConfig, cancellationToken: ct);

        // ACK múltiplo: confirma todo o lote de uma só vez
        await channel.BasicAckAsync(
            deliveryTag: snapshot[^1].Tag,
            multiple: true,
            cancellationToken: ct);

        logger.LogInformation("Lote de {Count} pedidos gravado", pedidos.Count);
    }
}

Registrando no Program.cs#

// Program.cs
// dotnet add package RabbitMQ.Client
builder.Services.AddSingleton<IConnection>(_ =>
{
    var factory = new ConnectionFactory
    {
        HostName = "localhost",
        UserName = "guest",
        Password = "guest"
    };
    // CreateConnectionAsync retorna Task — resolvemos aqui para o DI container
    return factory.CreateConnectionAsync().GetAwaiter().GetResult();
});

builder.Services.AddScoped<PedidoProducer>();
builder.Services.AddHostedService<PedidoConsumerWorker>();

⚠️ Atenção: No Oracle, o BulkInsert do EFCore.BulkExtensions requer o Oracle Data Provider for .NET (ODP.NET). Certifique-se de registrar o provider correto e de que as sequences foram configuradas corretamente no modelo, ou a inserção em massa vai falhar com erro de constraint de chave primária.

RabbitMQ vs Azure Service Bus: Qual Escolher?#

As duas opções resolvem o mesmo problema — desacoplar produção e consumo de mensagens — mas com modelos operacionais e econômicos bem distintos. A escolha depende do seu contexto: infraestrutura self-hosted vs cloud managed, custo variável vs fixo, e grau de controle desejado.

CritérioRabbitMQAzure Service Bus
TipoOpen-source (MPL 2.0), self-hostedPaaS gerenciado pela Microsoft
ProtocoloAMQP 0-9-1 (nativo), AMQP 1.0, MQTT, STOMPAMQP 1.0
HospedagemContainer próprio, VM, KubernetesAzure (sem infra para gerenciar)
CustoInfraestrutura + operação (seu time)Pay-per-use (~$0,10/milhão de ops)
FilasQueues, Exchanges, BindingsQueues e Topics/Subscriptions
Tamanho máximo de mensagem128 MB (padrão)256 KB (Standard) / 100 MB (Premium)
Retenção de mensagensAté o disco encher / TTL configurável14 dias (máximo)
Dead-letter queue✅ Configurável✅ Nativo
Sessions (ordenação garantida)✅ Via x-single-active-consumer✅ Service Bus Sessions
Retry automáticoManual (Dead-letter + requeue)Nativo (MaxDeliveryCount)
Escalabilidade horizontalManual (cluster Erlang)Automática
Biblioteca .NETRabbitMQ.Client (Apache 2.0)Azure.Messaging.ServiceBus (MIT)
Melhor paraOn-premise, multi-cloud, custo controladoEcossistema Azure, equipe pequena, SLA garantido

Implementação Equivalente com Azure Service Bus#

Para migrar do RabbitMQ para o Azure Service Bus em ambiente Azure, o padrão de producer/consumer é similar, usando Azure.Messaging.ServiceBus:

// dotnet add package Azure.Messaging.ServiceBus

// --- Producer com envio em lote nativo ---
public class PedidoServiceBusProducer(ServiceBusSender sender)
{
    public async Task PublicarLoteAsync(
        IEnumerable<PedidoCriadoMessage> mensagens,
        CancellationToken ct = default)
    {
        // O ServiceBusMessageBatch respeita o limite de tamanho automaticamente
        using var batch = await sender.CreateMessageBatchAsync(ct);

        foreach (var msg in mensagens)
        {
            var sbMsg = new ServiceBusMessage(
                BinaryData.FromObjectAsJson(msg))
            {
                ContentType = "application/json",
                MessageId   = msg.Id.ToString()
            };

            if (!batch.TryAddMessage(sbMsg))
                throw new InvalidOperationException(
                    $"Mensagem {msg.Id} excede o limite do lote");
        }

        await sender.SendMessagesAsync(batch, ct);
    }
}

// --- Consumer com BackgroundService ---
public class PedidoServiceBusWorker(
    ServiceBusProcessor processor,
    IServiceScopeFactory scopeFactory,
    ILogger<PedidoServiceBusWorker> logger) : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        processor.ProcessMessageAsync += async args =>
        {
            var msg = args.Message.Body
                          .ToObjectFromJson<PedidoCriadoMessage>();

            using var scope   = scopeFactory.CreateScope();
            var dbContext      = scope.ServiceProvider
                                      .GetRequiredService<AppDbContext>();
            var pedido         = new Pedido
            {
                Id          = msg.Id,
                ClienteId   = msg.ClienteId,
                Valor       = msg.Valor,
                DataCriacao = msg.DataCriacao
            };

            await dbContext.BulkInsertAsync(
                new List<Pedido> { pedido },
                cancellationToken: ct);

            // Confirma o processamento — remove da fila
            await args.CompleteMessageAsync(args.Message, ct);
        };

        processor.ProcessErrorAsync += args =>
        {
            logger.LogError(args.Exception,
                "Erro ao processar mensagem do Service Bus");
            return Task.CompletedTask;
        };

        await processor.StartProcessingAsync(ct);
        await Task.Delay(Timeout.Infinite, ct);
    }

    public override async Task StopAsync(CancellationToken ct)
    {
        await processor.StopProcessingAsync(ct);
        await base.StopAsync(ct);
    }
}

// Program.cs
builder.Services.AddSingleton(provider =>
{
    var client = new ServiceBusClient(
        "Endpoint=sb://meu-namespace.servicebus.windows.net/;...");
    return client.CreateSender("pedidos-queue");
});

builder.Services.AddSingleton(provider =>
{
    var client = new ServiceBusClient(
        "Endpoint=sb://meu-namespace.servicebus.windows.net/;...");
    return client.CreateProcessor("pedidos-queue", new ServiceBusProcessorOptions
    {
        MaxConcurrentCalls   = 4,   // Paralelismo no consumer
        AutoCompleteMessages = false // Controle manual de ACK
    });
});

builder.Services.AddHostedService<PedidoServiceBusWorker>();

💡 Dica: Em produção no Azure, prefira autenticação via Managed Identity em vez de connection string, usando new ServiceBusClient("meu-namespace.servicebus.windows.net", new DefaultAzureCredential()). Isso elimina segredos na configuração.


📖 Artigo completo com exemplos de código: Gargalo em Banco de Dados: Mensageria e Paginação

Carregando publicação patrocinada...
2

Boa.
So queria mencionar o Kafka como uma terceira opção — especialmente via Confluent Platform — pra quem está lidando com volumes realmente absurdos onde RabbitMQ e Service Bus já começam a suar frio.

Um ponto que acho interessante é o Kafka Connect com JDBC Sink Connector: em vez de escrever um consumer do zero, você configura o batch size e ele cuida do gargalo de escrita. Não é mágica, mas reduz bastante o código que você precisa manter.

O ksqlDB também entra bem no jogo da leitura eficiente — dá pra criar materialized views sobre os streams e tirar um pouco da pressão do banco transacional. E pra quem já sofreu com incompatibilidade de contrato entre producer e consumer, o Schema Registry com Avro ou Protobuf resolve isso de forma bem elegante, bem melhor do que torcer pro JSON continuar funcionando depois de um refactor.

Claro que é uma stack bem mais pesada, se for pelo confluent não e barato e não faz sentido pra todo projeto. Mas pra volumes extremos, vale conhecer.