[PITCH] 🤖 Agents + 🚦 Pipelines = TAgent com Injeção de dados, Execução Condicional e Concorrência
Fala, pessoal do TabNews! 🚀
Hoje venho compartilhar uma evolução significativa no TAgent, A versão 0.7.1 trouxe um sistema de Pipelines que está interessante a forma como construir workflows complexos de IA. (Eu achei) 😂
O Problema que Estava Enfrentando
Quem já trabalhou com agentes de IA sabe que rapidamente você sai do "hello world" e precisa de algo mais sofisticado:
- Workflows Multi-etapa: Análise → Processamento → Decisão → Ação
- Execução Condicional: "Se sentiment < 4, escalate para suporte"
- Concorrência: Múltiplas análises em paralelo
- Dependências Complexas: Etapa C só roda se A e B completaram
Com agentes tradicionais, você acaba com código espaguete, difícil de manter e debugar.
A Solução: TAgent Pipelines
O sistema de Pipelines do TAgent resolve isso de forma elegante:
from tagent.pipeline import PipelineBuilder
from tagent.pipeline.conditions import IsGreaterThan, IsLessThan, And, Or
from tagent.pipeline.executor import PipelineExecutor, PipelineExecutorConfig
from tagent.config import TAgentConfig
from pydantic import BaseModel, Field
# Definindo schemas estruturados
class SentimentAnalysis(BaseModel):
score: float = Field(description="Score de sentimento (0-10)")
confidence: float = Field(description="Confiança da análise (0-1)")
keywords: list[str] = Field(description="Palavras-chave extraídas")
class CustomerProfile(BaseModel):
tier: str = Field(description="bronze, silver, gold, platinum")
interaction_count: int = Field(description="Número de interações")
class EscalationTicket(BaseModel):
priority: str = Field(description="low, medium, high, critical")
department: str = Field(description="Departamento responsável")
estimated_resolution: str = Field(description="Tempo estimado")
Exemplo Prático: Sistema de Atendimento ao Cliente
Vamos construir um pipeline completo que analisa feedback de clientes e toma ações baseadas no resultado:
# Pipeline de análise de feedback com lógica condicional
pipeline = PipelineBuilder(
name="customer_feedback_pipeline",
description="Pipeline inteligente para processar feedback de clientes"
).step(
name="analyze_sentiment",
goal="Analisar sentimento do feedback do cliente",
output_schema=SentimentAnalysis,
execution_mode=ExecutionMode.CONCURRENT
).step(
name="get_customer_profile",
goal="Buscar perfil do cliente no banco de dados",
output_schema=CustomerProfile,
execution_mode=ExecutionMode.CONCURRENT,
depends_on=[] # Roda em paralelo com sentiment
).step(
name="escalate_critical",
goal="Escalar para gerência casos críticos",
depends_on=["analyze_sentiment", "get_customer_profile"],
condition=Or(
IsLessThan("analyze_sentiment.score", 3.0),
And(
IsLessThan("analyze_sentiment.score", 5.0),
IsGreaterThan("get_customer_profile.tier", "gold")
)
),
output_schema=EscalationTicket
).step(
name="auto_response_positive",
goal="Resposta automática para feedback positivo",
depends_on=["analyze_sentiment"],
condition=And(
IsGreaterThan("analyze_sentiment.score", 8.0),
IsGreaterThan("analyze_sentiment.confidence", 0.8)
)
).step(
name="assign_to_specialist",
goal="Atribuir para especialista técnico",
depends_on=["analyze_sentiment"],
condition=Or(
Contains("analyze_sentiment.keywords", "bug"),
Contains("analyze_sentiment.keywords", "erro"),
Contains("analyze_sentiment.keywords", "falha")
)
).step(
name="standard_response",
goal="Resposta padrão para casos neutros",
depends_on=["analyze_sentiment"],
condition=And(
IsGreaterThan("analyze_sentiment.score", 4.0),
IsLessThan("analyze_sentiment.score", 8.0)
)
).build()
Visualização do Fluxo
Aqui está como o pipeline funciona na prática:
┌─────────────────────── CUSTOMER FEEDBACK PIPELINE ───────────────────────────┐
│ │
│ INPUT: "O sistema está muito lento e bugado, preciso de ajuda urgente!" │
│ │
│ ┌──────────────────┐ ┌────────────────────┐ │
│ │ analyze_sentiment│ │get_customer_profile│ │
│ │ │ │ │ │
│ │ CONCURRENT │ │ CONCURRENT │ │
│ │ │ │ │ │
│ │ Score: 2.3 │ │ Tier: platinum │ │
│ │ Keywords: [bug, │ │ Interactions: 47 │ │
│ │ lento, ajuda] │ │ │ │
│ └──────────────────┘ └────────────────────┘ │
│ │ │ │
│ └────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ DECISION POINT │ │
│ │ │ │
│ │ IF score < 3.0 │ │
│ │ OR (score < 5.0 │ │
│ │ AND tier > gold)│ │
│ └─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ ┌────────────────────┐ │
│ │escalate_critical│ │assign_to_specialist│ │
│ │ │ │ │ │
│ │ - EXECUTED │ │ - EXECUTED │ │
│ │ │ │ │ │
│ │ Priority: HIGH │ │ Department: Tech │ │
│ │ Dept: Management│ │ ETA: 2 hours │ │
│ └─────────────────┘ └────────────────────┘ │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │auto_response_pos│ │standard_response│ │
│ │ │ │ │ │
│ │ - SKIPPED │ │ - SKIPPED │ │
│ │ │ │ │ │
│ │ (score too low) │ │ (score too low) │ │
│ └─────────────────┘ └─────────────────┘ │
│ │
│ OUTPUT: Escalated to management + Technical specialist assigned │
└──────────────────────────────────────────────────────────────────────────────┘
Execução e Monitoramento
# Configuração do executor
config = TAgentConfig(
model="openrouter/google/gemini-2.5-flash",
verbose=True
)
executor_config = PipelineExecutorConfig(
max_concurrent_steps=5,
enable_persistence=True,
retry_failed_steps=True,
max_retries=2
)
# Execução com UI em tempo real
executor = PipelineExecutor(pipeline, config, executor_config)
result = await executor.execute()
📊 Pipeline Dashboard:
Casos de Uso
1. Pipeline de Análise de Código
code_review_pipeline = PipelineBuilder(
name="code_review_pipeline",
description="Pipeline automatizado de code review"
).step(
name="last_commit",
goal="Executar análise estática do código",
execution_mode=ExecutionMode.CONCURRENT,
tools=[last_commit_tool]
).step(
name="static_analysis",
goal="Executar análise estática do código",
execution_mode=ExecutionMode.CONCURRENT,
dependes_on=["last_commit"],
read_data=[
"last_commit.diff",
]
).step(
name="security_scan",
goal="Escanear vulnerabilidades de segurança",
execution_mode=ExecutionMode.CONCURRENT,
dependes_on=["last_commit"],
read_data=[
"last_commit.diff",
]
).step(
name="performance_check",
goal="Verificar performance e complexidade",
execution_mode=ExecutionMode.CONCURRENT,
dependes_on=["last_commit"],
read_data=[
"last_commit.diff",
]
).step(
name="lint",
goal="Roda lint",
execution_mode=ExecutionMode.CONCURRENT,
dependes_on=["last_commit"],
read_data=[
"last_commit.diff"
]
).step(
name="generate_report",
goal="Gerar relatório consolidado",
depends_on=["static_analysis", "security_scan", "performance_check", "lint"],
condition=DataExists("static_analysis")
).step(
name="send_report_email",
goal="Envia email com report e sugestões para o dev@some_team.com",
depends_on=["generate_report"],
read_data=[
"generate_report.report"
],
tools=[send_email]
).step(
name="auto_fix_suggestions",
goal="Sugerir correções automáticas",
depends_on=["generate_report"],
condition=IsLessThan("generate_report.severity_score", 7.0)
).step(
name="send_suggestion_slack",
goal="Envia uma mensagem no canal #dev_review no slack",
depends_on=["auto_fix_suggestions"],
condition=DataExists("auto_fix_suggestions"),
read_data=[
"auto_fix_suggestions.suggestions"
],
tools=[
slack_channel_message
]
).build()
2. Pipeline de Processamento de Dados
data_pipeline = PipelineBuilder(
name="data_processing_pipeline",
description="Pipeline ETL inteligente"
).step(
name="validate_data",
goal="Validar qualidade dos dados de entrada"
).step(
name="clean_data",
goal="Limpar e normalizar dados",
depends_on=["validate_data"],
condition=IsGreaterThan("validate_data.quality_score", 0.7)
).step(
name="transform_data",
goal="Aplicar transformações necessárias",
depends_on=["clean_data"]
).step(
name="generate_insights",
goal="Gerar insights usando IA",
depends_on=["transform_data"],
execution_mode=ExecutionMode.CONCURRENT
).step(
name="create_dashboard",
goal="Criar dashboard interativo",
depends_on=["generate_insights"]
).build()
Recursos Avançados
1. Execução Condicional Inteligente
# Condições complexas com operadores lógicos
condition = And(
IsGreaterThan("sentiment.score", 8.0),
Or(
Contains("keywords", "excelente"),
Contains("keywords", "perfeito")
),
Not(IsEmpty("customer.email"))
)
2. Injeção de Dados Automática
.step(
name="generate_response",
goal="Gerar resposta personalizada",
depends_on=["analyze_sentiment", "get_customer_profile"],
read_data=[
"analyze_sentiment.score",
"get_customer_profile.tier",
"analyze_sentiment.keywords"
]
)
3. Tratamento de Erros Robusto
executor_config = PipelineExecutorConfig(
max_concurrent_steps=3,
retry_failed_steps=True,
max_retries=3,
retry_delay=1.0,
enable_persistence=True
)
Comparação com Outras Soluções
Recurso | TAgent Pipelines |
---|---|
Execução Condicional | ✅ Avançada |
Concorrência | ✅ Nativa |
Type Safety | ✅ Pydantic |
Retry Logic | ✅ Configurável |
Dependency Graph | ✅ Automático |
Instalação e Começando
# Instalar TAgent com suporte a pipelines
pip install "tagent[pipeline]"
# Ou instalar tudo
pip install "tagent[all]"
# Seu primeiro pipeline
from tagent.pipeline import PipelineBuilder
from tagent.pipeline.executor import PipelineExecutor
from tagent.config import TAgentConfig
from pydantic import BaseModel, Field
def fetch_content():
return "Tá ruim o dia hoje"
class ContentOutput(BaseModel):
value: str = Field(..., description="Conteúdo do texto")
pipeline = PipelineBuilder(
name="meu_primeiro_pipeline",
description="Análise simples de texto"
).step(
name="content",
goal="Carregue o texto atual",
tool=[fetch_content],
output_schema: ContentOutput
).step(
name="summarize",
goal="Deixe o texto com tom positivo",
depends_on=["analyze"],
read_data=["content.value"] # Inject this data direct on the context
).build()
config = TAgentConfig(model="any model from litellm")
executor = PipelineExecutor(pipeline, config)
result = await executor.execute()
O TAgent Pipelines tem uma forma interessante para lidar com agentes de IA. Com execução condicional, concorrência nativa e uma API intuitiva, você pode criar workflows sofisticados sem a complexidade tradicional.
Links:
O que vocês acham?