Executando verificação de segurança...
6

[PITCH] 🤖 Agents + 🚦 Pipelines = TAgent com Injeção de dados, Execução Condicional e Concorrência

Fala, pessoal do TabNews! 🚀

Hoje venho compartilhar uma evolução significativa no TAgent, A versão 0.7.1 trouxe um sistema de Pipelines que está interessante a forma como construir workflows complexos de IA. (Eu achei) 😂

O Problema que Estava Enfrentando

Quem já trabalhou com agentes de IA sabe que rapidamente você sai do "hello world" e precisa de algo mais sofisticado:

  • Workflows Multi-etapa: Análise → Processamento → Decisão → Ação
  • Execução Condicional: "Se sentiment < 4, escalate para suporte"
  • Concorrência: Múltiplas análises em paralelo
  • Dependências Complexas: Etapa C só roda se A e B completaram

Com agentes tradicionais, você acaba com código espaguete, difícil de manter e debugar.

A Solução: TAgent Pipelines

O sistema de Pipelines do TAgent resolve isso de forma elegante:

from tagent.pipeline import PipelineBuilder
from tagent.pipeline.conditions import IsGreaterThan, IsLessThan, And, Or
from tagent.pipeline.executor import PipelineExecutor, PipelineExecutorConfig
from tagent.config import TAgentConfig
from pydantic import BaseModel, Field

# Definindo schemas estruturados
class SentimentAnalysis(BaseModel):
    score: float = Field(description="Score de sentimento (0-10)")
    confidence: float = Field(description="Confiança da análise (0-1)")
    keywords: list[str] = Field(description="Palavras-chave extraídas")

class CustomerProfile(BaseModel):
    tier: str = Field(description="bronze, silver, gold, platinum")
    interaction_count: int = Field(description="Número de interações")
    
class EscalationTicket(BaseModel):
    priority: str = Field(description="low, medium, high, critical")
    department: str = Field(description="Departamento responsável")
    estimated_resolution: str = Field(description="Tempo estimado")

Exemplo Prático: Sistema de Atendimento ao Cliente

Vamos construir um pipeline completo que analisa feedback de clientes e toma ações baseadas no resultado:

# Pipeline de análise de feedback com lógica condicional
pipeline = PipelineBuilder(
    name="customer_feedback_pipeline",
    description="Pipeline inteligente para processar feedback de clientes"
).step(
    name="analyze_sentiment",
    goal="Analisar sentimento do feedback do cliente",
    output_schema=SentimentAnalysis,
    execution_mode=ExecutionMode.CONCURRENT
).step(
    name="get_customer_profile", 
    goal="Buscar perfil do cliente no banco de dados",
    output_schema=CustomerProfile,
    execution_mode=ExecutionMode.CONCURRENT,
    depends_on=[]  # Roda em paralelo com sentiment
).step(
    name="escalate_critical",
    goal="Escalar para gerência casos críticos",
    depends_on=["analyze_sentiment", "get_customer_profile"],
    condition=Or(
        IsLessThan("analyze_sentiment.score", 3.0),
        And(
            IsLessThan("analyze_sentiment.score", 5.0),
            IsGreaterThan("get_customer_profile.tier", "gold")
        )
    ),
    output_schema=EscalationTicket
).step(
    name="auto_response_positive",
    goal="Resposta automática para feedback positivo",
    depends_on=["analyze_sentiment"],
    condition=And(
        IsGreaterThan("analyze_sentiment.score", 8.0),
        IsGreaterThan("analyze_sentiment.confidence", 0.8)
    )
).step(
    name="assign_to_specialist",
    goal="Atribuir para especialista técnico",
    depends_on=["analyze_sentiment"],
    condition=Or(
        Contains("analyze_sentiment.keywords", "bug"),
        Contains("analyze_sentiment.keywords", "erro"),
        Contains("analyze_sentiment.keywords", "falha")
    )
).step(
    name="standard_response",
    goal="Resposta padrão para casos neutros",
    depends_on=["analyze_sentiment"],
    condition=And(
        IsGreaterThan("analyze_sentiment.score", 4.0),
        IsLessThan("analyze_sentiment.score", 8.0)
    )
).build()

Visualização do Fluxo

Aqui está como o pipeline funciona na prática:

┌─────────────────────── CUSTOMER FEEDBACK PIPELINE ───────────────────────────┐
│                                                                              │
│  INPUT: "O sistema está muito lento e bugado, preciso de ajuda urgente!"     │
│                                                                              │
│  ┌──────────────────┐    ┌────────────────────┐                              │
│  │ analyze_sentiment│    │get_customer_profile│                              │
│  │                  │    │                    │                              │
│  │ CONCURRENT       │    │ CONCURRENT         │                              │
│  │                  │    │                    │                              │
│  │ Score: 2.3       │    │ Tier: platinum     │                              │
│  │ Keywords: [bug,  │    │ Interactions: 47   │                              │
│  │  lento, ajuda]   │    │                    │                              │
│  └──────────────────┘    └────────────────────┘                              │
│           │                        │                                         │
│           └────────┬───────────────┘                                         │
│                    │                                                         │
│                    ▼                                                         │
│         ┌─────────────────┐                                                  │
│         │ DECISION POINT  │                                                  │
│         │                 │                                                  │
│         │ IF score < 3.0  │                                                  │
│         │ OR (score < 5.0 │                                                  │
│         │ AND tier > gold)│                                                  │
│         └─────────────────┘                                                  │
│                    │                                                         │
│                    ▼                                                         │
│         ┌─────────────────┐        ┌────────────────────┐                    │
│         │escalate_critical│        │assign_to_specialist│                    │
│         │                 │        │                    │                    │
│         │ - EXECUTED      │        │ - EXECUTED         │                    │
│         │                 │        │                    │                    │
│         │ Priority: HIGH  │        │ Department: Tech   │                    │
│         │ Dept: Management│        │ ETA: 2 hours       │                    │
│         └─────────────────┘        └────────────────────┘                    │
│                                                                              │
│         ┌─────────────────┐        ┌─────────────────┐                       │
│         │auto_response_pos│        │standard_response│                       │
│         │                 │        │                 │                       │
│         │ - SKIPPED       │        │ - SKIPPED       │                       │
│         │                 │        │                 │                       │
│         │ (score too low) │        │ (score too low) │                       │
│         └─────────────────┘        └─────────────────┘                       │
│                                                                              │
│  OUTPUT: Escalated to management + Technical specialist assigned             │
└──────────────────────────────────────────────────────────────────────────────┘

Execução e Monitoramento

# Configuração do executor
config = TAgentConfig(
    model="openrouter/google/gemini-2.5-flash",
    verbose=True
)

executor_config = PipelineExecutorConfig(
    max_concurrent_steps=5,
    enable_persistence=True,
    retry_failed_steps=True,
    max_retries=2
)

# Execução com UI em tempo real
executor = PipelineExecutor(pipeline, config, executor_config)
result = await executor.execute()

📊 Pipeline Dashboard:

Casos de Uso

1. Pipeline de Análise de Código

code_review_pipeline = PipelineBuilder(
    name="code_review_pipeline",
    description="Pipeline automatizado de code review"
).step(
    name="last_commit",
    goal="Executar análise estática do código",
    execution_mode=ExecutionMode.CONCURRENT,
    tools=[last_commit_tool]
).step(
    name="static_analysis",
    goal="Executar análise estática do código",
    execution_mode=ExecutionMode.CONCURRENT,
    dependes_on=["last_commit"],
    read_data=[
       "last_commit.diff",
    ]
).step(
    name="security_scan", 
    goal="Escanear vulnerabilidades de segurança",
    execution_mode=ExecutionMode.CONCURRENT,
    dependes_on=["last_commit"],
    read_data=[
       "last_commit.diff",
    ]
).step(
    name="performance_check",
    goal="Verificar performance e complexidade",
    execution_mode=ExecutionMode.CONCURRENT,
    dependes_on=["last_commit"],
    read_data=[
       "last_commit.diff",
    ]
).step(
    name="lint",
    goal="Roda lint",
    execution_mode=ExecutionMode.CONCURRENT,
    dependes_on=["last_commit"],
    read_data=[
       "last_commit.diff"
    ]
).step(
    name="generate_report",
    goal="Gerar relatório consolidado",
    depends_on=["static_analysis", "security_scan", "performance_check", "lint"],
    condition=DataExists("static_analysis")
).step(
    name="send_report_email",
    goal="Envia email com report e sugestões para o dev@some_team.com",
    depends_on=["generate_report"],
    read_data=[
       "generate_report.report"
    ],
    tools=[send_email]
).step(
    name="auto_fix_suggestions",
    goal="Sugerir correções automáticas",
    depends_on=["generate_report"],
    condition=IsLessThan("generate_report.severity_score", 7.0)
).step(
    name="send_suggestion_slack",
    goal="Envia uma mensagem no canal #dev_review no slack",
    depends_on=["auto_fix_suggestions"],
    condition=DataExists("auto_fix_suggestions"),
    read_data=[
       "auto_fix_suggestions.suggestions"
    ],
    tools=[
       slack_channel_message
    ]
).build()

2. Pipeline de Processamento de Dados

data_pipeline = PipelineBuilder(
    name="data_processing_pipeline",
    description="Pipeline ETL inteligente"
).step(
    name="validate_data",
    goal="Validar qualidade dos dados de entrada"
).step(
    name="clean_data",
    goal="Limpar e normalizar dados",
    depends_on=["validate_data"],
    condition=IsGreaterThan("validate_data.quality_score", 0.7)
).step(
    name="transform_data",
    goal="Aplicar transformações necessárias",
    depends_on=["clean_data"]
).step(
    name="generate_insights",
    goal="Gerar insights usando IA",
    depends_on=["transform_data"],
    execution_mode=ExecutionMode.CONCURRENT
).step(
    name="create_dashboard",
    goal="Criar dashboard interativo",
    depends_on=["generate_insights"]
).build()

Recursos Avançados

1. Execução Condicional Inteligente

# Condições complexas com operadores lógicos
condition = And(
    IsGreaterThan("sentiment.score", 8.0),
    Or(
        Contains("keywords", "excelente"),
        Contains("keywords", "perfeito")
    ),
    Not(IsEmpty("customer.email"))
)

2. Injeção de Dados Automática

.step(
    name="generate_response",
    goal="Gerar resposta personalizada",
    depends_on=["analyze_sentiment", "get_customer_profile"],
    read_data=[
        "analyze_sentiment.score",
        "get_customer_profile.tier",
        "analyze_sentiment.keywords"
    ]
)

3. Tratamento de Erros Robusto

executor_config = PipelineExecutorConfig(
    max_concurrent_steps=3,
    retry_failed_steps=True,
    max_retries=3,
    retry_delay=1.0,
    enable_persistence=True
)

Comparação com Outras Soluções

RecursoTAgent Pipelines
Execução Condicional✅ Avançada
Concorrência✅ Nativa
Type Safety✅ Pydantic
Retry Logic✅ Configurável
Dependency Graph✅ Automático

Instalação e Começando

# Instalar TAgent com suporte a pipelines
pip install "tagent[pipeline]"

# Ou instalar tudo
pip install "tagent[all]"
# Seu primeiro pipeline
from tagent.pipeline import PipelineBuilder
from tagent.pipeline.executor import PipelineExecutor
from tagent.config import TAgentConfig
from pydantic import BaseModel, Field

def fetch_content():
   return "Tá ruim o dia hoje"
   
class ContentOutput(BaseModel):
    value: str = Field(..., description="Conteúdo do texto")

pipeline = PipelineBuilder(
    name="meu_primeiro_pipeline",
    description="Análise simples de texto"
).step(
    name="content",
    goal="Carregue o texto atual",
    tool=[fetch_content],
    output_schema: ContentOutput
).step(
    name="summarize", 
    goal="Deixe o texto com tom positivo",
    depends_on=["analyze"],
    read_data=["content.value"] # Inject this data direct on the context
).build()

config = TAgentConfig(model="any model from litellm")
executor = PipelineExecutor(pipeline, config)
result = await executor.execute()

O TAgent Pipelines tem uma forma interessante para lidar com agentes de IA. Com execução condicional, concorrência nativa e uma API intuitiva, você pode criar workflows sofisticados sem a complexidade tradicional.

Links:

O que vocês acham?

Carregando publicação patrocinada...