Executando verificação de segurança...
4

Processamento Massivo de XMLs Fiscais: Como automatizei a recuperação de centenas de milhares de documentos sem travar o servidor

Sobre mim e este projeto

Meu nome é Cláudio Silva. Sou desenvolvedor fullstack de Fortaleza-CE, atualmente como Fullstack Pleno na Gt Gestão Contábil e fundador da Front-End Fusion, comunidade de mentoria para desenvolvedores.

Nos últimos meses, enfrentei um dos maiores desafios da minha carreira. Dentro da empresa onde atuo, lidamos com uma operação fiscal de grande escala: quase 600 empresas, com um volume médio de 16 mil XMLs por empresa por mês, entre NFC-e e CF-e. Isso rapidamente se transforma em centenas de milhares de documentos fiscais mensalmente.

E o processo era manual. Analistas acessavam o portal da SEFAZ e realizavam a recuperação nota por nota. Um trabalho repetitivo, demorado e propenso a erros.

O desafio não era apenas automatizar. Era construir uma solução que conseguisse suportar alto volume de requisições, respeitar os limites da SEFAZ sem causar bloqueios, diferenciar corretamente os fluxos de NFC-e e CF-e, ser confiável para uso em contexto contábil e, ao mesmo tempo, simples para o usuário final.

Este artigo documenta exatamente como fiz isso: desde a decisão arquitetural de usar processamento assíncrono até a implementação de controle de concorrência e rate limiting. O foco é prático, com as decisões técnicas que tomei e os problemas que precisei resolver ao longo do caminho.

Arquitetura e Visão Geral

A solução foi estruturada para ser escalável, resiliente e respeitar os limites externos da SEFAZ.

ComponenteTecnologiaPapel Estratégico
API BackendFastAPIFramework assíncrono para alta concorrência sem bloquear threads
OrquestradorAsyncio + WorkersProcessamento em background para evitar timeout na UI
Controle de ConcorrênciaSemáforosLimita requisições simultâneas para proteger a SEFAZ
Rate LimitingRate Limiter adaptativoRespeita limites de RPS (requisições por segundo) da SEFAZ
PersistênciaSupabase (PostgreSQL)Armazenamento de estado e metadados das chaves
FrontendNext.js + React + TypeScriptInterface simples que reduz fricção operacional

A decisão mais importante foi desacoplar a solicitação do processamento. Em vez de fazer o usuário esperar o processamento terminar (o que poderia levar horas), transferi essa tarefa para workers em background. A interface passou a fazer polling de status, e o servidor nunca mais travou por timeout.

Passo a Passo Técnico

1. Iniciação Assíncrona com Command Pattern

O problema que isso resolve é simples: uma requisição HTTP não pode ficar aberta por minutos ou horas. O navegador vai dar timeout, e o usuário vai achar que o sistema quebrou.

Com o Command Pattern, o usuário envia as chaves, e a API responde imediatamente com um extraction_id. O processamento real acontece em background.

# features/cfe/recuperacao_xml/api.py
@router.post("/extractions/{extraction_id}/recuperar-xmls")
async def iniciar_recuperacao(
    extraction_id: str,
    request: dict,
    current_user: CurrentUser = Depends(get_current_user),
):
    token_sefaz = request.get('sefaz_token')
    
    # Busca as chaves da extração
    chaves = await repo.get_melhores_chaves_para_recuperacao(extraction_id)
    
    # Cria o comando e dispara o handler
    command = IniciarRecuperacaoCfeCommand(
        extraction_id=extraction_id,
        chaves=chaves,
        token_sefaz=token_sefaz,
    )
    
    # Handler valida e dispara o orquestrador em background
    resultado = await handler.handle(command)
    
    # Retorna IMEDIATAMENTE (sem esperar o processamento)
    return {
        'recuperacao_id': resultado.recuperacao_id,
        'status': resultado.status,
        'estimativa_segundos': resultado.estimativa_segundos,
        'message': 'Recuperação iniciada em background'
    }

O usuário recebe a resposta em menos de 200ms. O processamento real acontece em uma task assíncrona separada.

2. Controle de Concorrência com Semáforos

A SEFAZ não aceita centenas de requisições simultâneas. Se você tentar, seu IP é bloqueado. Aprendi isso da pior forma durante os primeiros testes.

A solução foi implementar semáforos que limitam o número de requisições simultâneas. Cada semáforo age como um "portão" que só deixa passar um número controlado de tarefas por vez.

# container.py - Injeção de dependência dos semáforos
def get_recuperacao_semaphore() -> asyncio.Semaphore:
    """Semáforo específico para operações de recuperação XML."""
    return asyncio.Semaphore(45)  # Máximo 45 downloads simultâneos

def get_consulta_semaphore() -> asyncio.Semaphore:
    """Semáforo específico para consultas de protocolo na SEFAZ."""
    config = get_config()
    return asyncio.Semaphore(config.SEFAZ_DOWNLOAD_CONCORRENCIA)

No orquestrador, cada fase da recuperação usa seu semáforo específico:

# services.py - Orquestrador com controle de concorrência
class OrquestradorRecuperacao:
    def __init__(self, repository, semaforo: asyncio.Semaphore):
        self._repo = repository
        self._semaforo = semaforo  # Controla concorrência

    async def executar(self, recuperacao: Recuperacao, token_sefaz: str):
        # Cada fase executa dentro do semáforo
        async with self._semaforo:
            await self._fase_consulta(recuperacao)
        
        async with self._semaforo:
            await self._fase_download(recuperacao, token_sefaz)

Isso garantiu que, mesmo processando 50 mil chaves, nunca mais tivemos bloqueio de IP da SEFAZ.

3. Rate Limiting Adaptativo

Além do controle de concorrência (quantas tarefas SIMULTÂNEAS), precisávamos controlar a taxa de requisições por segundo (RPS). A SEFAZ tem limites claros: 45 RPS para downloads, 80 RPS para consultas.

Implementei um RateLimiter que insere delays automáticos entre requisições:

# infrastructure.py - Rate Limiter para respeitar limites da SEFAZ
class RateLimiter:
    def __init__(self, rps: float):
        self.interval = 1.0 / rps  # Intervalo mínimo entre requisições
        self._last_request = 0.0
        self._lock = asyncio.Lock()

    async def acquire(self):
        async with self._lock:
            now = time.time()
            elapsed = now - self._last_request
            wait = max(0.0, self.interval - elapsed)
            if wait > 0:
                await asyncio.sleep(wait)  # Espera o tempo necessário
            self._last_request = time.time()

E no adapter de download, aplicamos o rate limiter antes de cada chamada:

class CfeDownloadRateLimiter:
    async def download_com_controle(self, client, chave):
        async with self._semaphore:  # Controle de concorrência
            await self._rate_limiter.acquire()  # Controle de RPS
            return await client.download(chave)

O tradeoff foi aceitar que o processamento total levaria alguns minutos em vez de segundos, mas em troca ganhamos estabilidade absoluta e zero bloqueios.

4. Diferenciação de Fluxos: NFC-e vs CF-e

Cada tipo de documento tem um fluxo diferente:

  • NFC-e (modelo 65): Consulta protocolo → Download → ZIP
  • CF-e (modelo 59): Download direto → ZIP (sem consulta de protocolo)

A solução foi implementar roteamento dinâmico baseado no tipo_nota da extração:

# api.py - Roteamento baseado no tipo da nota
tipo_nota = extracao.tipo_nota

if tipo_nota == '59':  # CF-e
    if not config.CFE_HABILITADO:
        raise HTTPException(status_code=503, detail='CF-e não disponível')
    
    # Usa fluxo CF-e (download direto)
    handler = IniciarRecuperacaoCfeHandler(repo)
    command = IniciarRecuperacaoCfeCommand(...)
    resultado = await handler.handle(command)

else:  # NFC-e (65) - fluxo padrão
    handler = IniciarRecuperacaoHandler(repo)
    command = IniciarRecuperacaoCommand(...)
    resultado = await handler.handle(command)

Aprendi uma distinção importante: não adianta ter um fluxo genérico. Cada tipo de documento fiscal tem suas próprias regras, e a arquitetura precisa refletir isso desde o início.

Problemas que Enfrentei e Como Resolvi

1. Timeout na UI em extrações longas

Causa: O usuário ficava esperando a requisição HTTP retornar. Com 50 mil chaves, o processamento levava minutos, e o navegador dava timeout.

Resolução: Mudei para um modelo de polling. A interface consulta o endpoint de status a cada 3 segundos e atualiza uma barra de progresso. O usuário vê o andamento em tempo real e não fica com a sensação de "sistema travado".

# Interface faz polling a cada 3 segundos
const interval = setInterval(async () => {
    const status = await fetch(`/v1/status/${extraction_id}`);
    const data = await status.json();
    updateProgress(data.progresso.percentual);
    if (data.status === 'zip_gerado') clearInterval(interval);
}, 3000);

2. Bloqueio de IP pela SEFAZ

Causa: Nas primeiras versões, eu disparava todas as requisições em paralelo sem controle. A SEFAZ interpretava como ataque e bloqueava o IP da VPS.

Resolução: Implementei duas camadas de proteção:

  • Semáforos: limitam o número de requisições simultâneas (ex: 45 downloads ao mesmo tempo)
  • Rate Limiter: insere delays automáticos para respeitar o limite de RPS (ex: 45 requisições por segundo)

O resultado foi um fluxo "cadenciado" que a SEFAZ aceita sem problemas.

3. Estouro de Memória ao Gerar ZIP

Causa: Tentar carregar 50 mil XMLs na memória RAM para depois compactar tudo de uma vez. Em testes, a RAM chegava a 8GB antes do servidor cair.

Resolução: Implementei geração de ZIP em streaming. Os XMLs são escritos no arquivo ZIP à medida que são baixados, sem acumular tudo na memória. O consumo de RAM se manteve constante em cerca de 512MB, independentemente do volume de notas.

Resultados Alcançados

MétricaAntes (Manual/Síncrono)Depois (Automatizado/Assíncrono)
Tempo por empresaDiasMinutos
Limite de notas por loteImpossível processar em lote50.000+ notas
Tempo de resposta da UI> 60 segundos (timeout)< 200ms (imediato)
Consumo de RAM (pico)8GB+ (instável)~512MB (constante)
Taxa de sucesso na SEFAZ~65% (bloqueios frequentes)99.8%
Esforço operacionalTime dedicadoZero intervenção manual

O impacto foi direto e mensurável. O que antes levava dias, hoje leva minutos por empresa. Processamento de milhares de documentos em minutos, redução significativa de trabalho manual, aumento de confiabilidade e escala operacional sem aumento proporcional de esforço.

Fontes e Referências

Conclusão

Código bem escrito resolve problemas de negócio, não só técnicos. Esse projeto reforçou algo importante para mim: tecnologia bem aplicada não é sobre complexidade. É sobre resolver problemas reais de forma eficiente.

A adoção de processamento assíncrono, controle de concorrência com semáforos e rate limiting adaptativo transformou uma tarefa manual que levava dias em um processo automatizado de minutos. A equipe da GT Gestão Contábil passou a operar em escala sem aumentar o esforço operacional.

Se você está enfrentando problemas de instabilidade ao lidar com grandes volumes de dados ou integrações externas lentas (SEFAZ, bancos, gateways de pagamento), considere adotar este fluxo de orquestração assíncrona. O investimento inicial na arquitetura se paga em horas economizadas de suporte e estabilidade operacional.

E no fim, ser dev é isso. Entregar valor de ponta a ponta.


Quer falar comigo?

Carregando publicação patrocinada...
1

Boa, e obrigado por compartilhar.

Resolveu um problema real, que é o que importa.

Mas lendo a stack toda, eu só conseguia imaginar que em outro mundo isso terminava em um one-liner com xargs curl && zip e muito menos código para manter.