Capítulo 15: Performance y Optimización
Capítulo 15: Performance y Optimización
Un agente lento o costoso no llega a producción, o si llega, no sobrevive. Este capítulo cubre todas las técnicas para construir agentes que sean rápidos, económicos y escalables — sin sacrificar calidad.
La optimización de agentes tiene tres dimensiones que compiten entre sí:
graph TD
T[Triángulo de Trade-offs]
T --> V[Velocidad\ntime-to-complete]
T --> C[Costo\nUSD por query]
T --> Q[Calidad\nresultados correctos]
V <-->|"Modelo rápido = peor calidad"| Q
C <-->|"Modelo barato = peor calidad"| Q
V <-->|"Más rápido = más tokens = más costo"| C
La clave es entender cuándo cada dimensión importa más para tu caso de uso específico.
1. Métricas de Performance de Agentes
Las métricas correctas para medir
Antes de optimizar, necesitas medir. Las métricas de un agente son distintas a las de una API REST simple:
Latencia:
- Time-to-first-token (TTFT): Cuánto tarda en llegar el primer carácter de respuesta. Crítico para UX en tiempo real.
- Time-to-complete (TTC): Tiempo total hasta que el agente termina su tarea. Crítico para batch jobs.
- Tool latency: Cuánto tiempo pasan las tool calls (pueden dominar el tiempo total).
Throughput:
- Queries por segundo: Cuántas tareas puede procesar en paralelo.
- Tokens por segundo: Velocidad de generación del modelo.
Costo:
- USD por query: El costo directo de una invocación.
- USD por herramienta: Qué herramientas consumen más tokens (inputs + outputs).
- USD por token de output vs. input: Los tokens de output cuestan 3-5x más que input.
Cómo medir: instrumentación básica
Python:
from claude_code_sdk import query, ClaudeCodeOptions
import time
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class AgentMetrics:
start_time: float = field(default_factory=time.time)
first_token_time: Optional[float] = None
end_time: Optional[float] = None
total_cost_usd: float = 0.0
total_turns: int = 0
tool_calls: list[str] = field(default_factory=list)
@property
def ttft_ms(self) -> Optional[float]:
if self.first_token_time:
return (self.first_token_time - self.start_time) * 1000
return None
@property
def ttc_ms(self) -> Optional[float]:
if self.end_time:
return (self.end_time - self.start_time) * 1000
return None
def report(self) -> str:
return (
f"TTFT: {self.ttft_ms:.0f}ms | "
f"TTC: {self.ttc_ms:.0f}ms | "
f"Costo: ${self.total_cost_usd:.4f} | "
f"Turns: {self.total_turns} | "
f"Tools: {', '.join(self.tool_calls)}"
)
async def measured_agent(task: str, options: ClaudeCodeOptions) -> tuple[str, AgentMetrics]:
metrics = AgentMetrics()
result = ""
async for message in query(prompt=task, options=options):
# Detectar primer token (primera respuesta del modelo)
if metrics.first_token_time is None:
metrics.first_token_time = time.time()
# Contabilizar costo
if hasattr(message, 'cost_usd') and message.cost_usd:
metrics.total_cost_usd += message.cost_usd
# Contar turns y herramientas
if hasattr(message, 'num_turns'):
metrics.total_turns = message.num_turns
if hasattr(message, 'result'):
result = message.result
metrics.end_time = time.time()
print(f"[METRICS] {metrics.report()}")
return result, metrics
TypeScript:
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
interface AgentMetrics {
startTime: number;
firstTokenTime?: number;
endTime?: number;
totalCostUsd: number;
totalTurns: number;
toolCalls: string[];
}
function createMetrics(): AgentMetrics {
return {
startTime: Date.now(),
totalCostUsd: 0,
totalTurns: 0,
toolCalls: [],
};
}
function reportMetrics(m: AgentMetrics): string {
const ttft = m.firstTokenTime ? m.firstTokenTime - m.startTime : null;
const ttc = m.endTime ? m.endTime - m.startTime : null;
return `TTFT: ${ttft?.toFixed(0) ?? "N/A"}ms | TTC: ${ttc?.toFixed(0) ?? "N/A"}ms | Costo: $${m.totalCostUsd.toFixed(4)}`;
}
async function measuredAgent(task: string, options: ClaudeCodeOptions): Promise<[string, AgentMetrics]> {
const metrics = createMetrics();
let result = "";
for await (const message of query({ prompt: task, options })) {
if (!metrics.firstTokenTime) {
metrics.firstTokenTime = Date.now();
}
if (message.type === "result") {
result = message.result;
}
}
metrics.endTime = Date.now();
console.log(`[METRICS] ${reportMetrics(metrics)}`);
return [result, metrics];
}
Benchmark: línea de base
Antes de optimizar, establece una línea de base ejecutando la misma tarea 10 veces y midiendo:
- P50 (mediana): el caso típico
- P95: el caso en el que el 95% de requests son más rápidos
- P99: el peor caso frecuente
2. Selección de Modelo por Tarea
Entendiendo la familia de modelos Claude
La selección de modelo es la optimización de mayor impacto. Un error aquí puede multiplicar por 10x tu costo o reducir a la mitad la calidad.
Claude Opus 4: El más capaz.
- Mejor para: razonamiento complejo, arquitectura de sistemas, análisis crítico, tareas de múltiples pasos con interdependencias
- Cuándo vale el costo extra: cuando un error cuesta más que el modelo más barato, cuando se necesita razonamiento profundo
- No usar para: tareas repetitivas simples, formateo de datos, extracción de información estructurada
Claude Sonnet 4: El punto dulce.
- Mejor para: la mayoría de tareas de desarrollo, análisis de código, generación de código moderadamente complejo
- Es el default para la mayoría de casos
- Equilibrio óptimo entre costo y calidad
Claude Haiku 3.5: El rápido y barato.
- Mejor para: clasificación, extracción de datos simples, resúmenes cortos, tareas con plantilla
- No usar para: código complejo, razonamiento multi-paso, toma de decisiones críticas
Router de modelos basado en complejidad
Python:
from claude_code_sdk import query, ClaudeCodeOptions
import re
def estimate_task_complexity(task: str) -> str:
"""
Estima la complejidad de una tarea y retorna el modelo recomendado.
Retorna: "claude-opus-4-5" | "claude-sonnet-4-5" | "claude-haiku-3-5"
"""
task_lower = task.lower()
word_count = len(task.split())
# Indicadores de alta complejidad -> Opus
high_complexity_signals = [
r"\barchitectura\b",
r"\bdiseño\s+de\s+sistema",
r"\boptimización\s+de\s+algoritmo",
r"\brefactorización\s+completa",
r"\bseguridad\s+(crítica|producción)",
r"\bmigración",
r"\banalyze\s+(entire|whole|complete|all)",
r"\bmultiple\s+files",
r"\bcomplex\b.*\barchitecture\b",
]
for signal in high_complexity_signals:
if re.search(signal, task_lower):
return "claude-opus-4-5"
# Indicadores de baja complejidad -> Haiku
low_complexity_signals = [
r"\bclasifica\b",
r"\bextrae\b.*\b(campo|dato|valor)\b",
r"\bformatea\b",
r"\bresume\b",
r"\btraduce\b",
r"\bconvierte\b.*\bformato\b",
r"^(yes|no|true|false)", # Pregunta de sí/no
]
# Tarea corta con baja complejidad
if word_count < 20:
for signal in low_complexity_signals:
if re.search(signal, task_lower):
return "claude-haiku-3-5"
# Default: Sonnet para la mayoría de tareas
return "claude-sonnet-4-5"
async def routed_agent(task: str, workspace: str) -> str:
model = estimate_task_complexity(task)
print(f"[ROUTER] Tarea asignada a: {model}")
options = ClaudeCodeOptions(
model=model,
cwd=workspace,
allowed_tools=["View", "GlobTool", "GrepTool", "Edit"],
max_turns=20,
)
result = ""
async for message in query(prompt=task, options=options):
if hasattr(message, 'result'):
result = message.result
return result
# Ejemplos de routing
# "Clasifica este archivo como config o código" -> Haiku
# "Refactoriza la clase UserService" -> Sonnet
# "Diseña la arquitectura de microservicios para este monolito" -> Opus
TypeScript:
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
type ModelId = "claude-opus-4-5" | "claude-sonnet-4-5" | "claude-haiku-3-5";
function estimateTaskComplexity(task: string): ModelId {
const taskLower = task.toLowerCase();
const wordCount = task.split(/\s+/).length;
const highComplexitySignals = [
/\barchitecture\b/,
/\bsystem design\b/,
/\bcomplete refactor/,
/\bmigration\b/,
/\bmultiple files\b/,
];
for (const signal of highComplexitySignals) {
if (signal.test(taskLower)) return "claude-opus-4-5";
}
const lowComplexitySignals = [
/\bclassify\b/,
/\bextract\b.*\b(field|value)\b/,
/\bformat\b/,
/\bsummarize\b/,
/\btranslate\b/,
];
if (wordCount < 20) {
for (const signal of lowComplexitySignals) {
if (signal.test(taskLower)) return "claude-haiku-3-5";
}
}
return "claude-sonnet-4-5";
}
async function routedAgent(task: string, workspace: string): Promise<string> {
const model = estimateTaskComplexity(task);
console.log(`[ROUTER] Modelo seleccionado: ${model}`);
const options: ClaudeCodeOptions = {
model,
cwd: workspace,
allowedTools: ["View", "GlobTool", "GrepTool", "Edit"],
maxTurns: 20,
};
let result = "";
for await (const message of query({ prompt: task, options })) {
if (message.type === "result") {
result = message.result;
}
}
return result;
}
Estimación de costo antes de ejecutar
# Precios aproximados por millón de tokens (verificar en anthropic.com)
MODEL_PRICING = {
"claude-opus-4-5": {"input": 15.0, "output": 75.0},
"claude-sonnet-4-5": {"input": 3.0, "output": 15.0},
"claude-haiku-3-5": {"input": 0.25, "output": 1.25},
}
def estimate_cost(prompt: str, model: str, expected_output_tokens: int = 500) -> float:
"""Estimación rápida del costo antes de ejecutar."""
# Aprox: 1 token = 4 caracteres
input_tokens = len(prompt) / 4
pricing = MODEL_PRICING.get(model, MODEL_PRICING["claude-sonnet-4-5"])
input_cost = (input_tokens / 1_000_000) * pricing["input"]
output_cost = (expected_output_tokens / 1_000_000) * pricing["output"]
return input_cost + output_cost
# Ejemplo
cost = estimate_cost(
prompt="Analiza este archivo de 10KB de código...",
model="claude-opus-4-5",
expected_output_tokens=1000
)
print(f"Costo estimado: ${cost:.4f}")
3. Prompt Caching
Cómo funciona el prompt caching de Anthropic
El prompt caching permite guardar en caché partes del contexto que se envían repetidamente. Cuando el mismo contenido se envía de nuevo, Anthropic lo sirve desde caché en lugar de procesarlo de nuevo.
Ahorro típico:
- Escritura en caché: 25% del costo de input normal
- Lectura desde caché: 10% del costo de input normal
- Para system_prompts largos que se repiten: hasta 90% de ahorro en input tokens
Reglas de invalidación de caché
El caché se invalida cuando:
- El contenido cambia (cualquier carácter diferente)
- El modelo cambia
- La posición del breakpoint cambia
- Han pasado más de 5 minutos sin uso (por defecto)
Anti-patrón crítico: system_prompt dinámico
# MAL: El timestamp hace que el caché se invalide en cada request
async def bad_agent(task: str, user_id: str):
options = ClaudeCodeOptions(
system_prompt=f"""
Eres un asistente de código.
Usuario actual: {user_id}
Timestamp: {datetime.now()} # <- Esto invalida el caché cada segundo
Reglas: [... 5000 tokens de reglas ...]
"""
)
Correcto: separar la parte estática de la dinámica
# BIEN: La parte estática (larga) va en el system_prompt cacheado
# La parte dinámica va en el prompt del usuario
STATIC_SYSTEM_PROMPT = """
Eres un asistente experto en Python y TypeScript.
REGLAS DE CÓDIGO:
[... 5000 tokens de reglas estáticas que nunca cambian ...]
PATRONES A SEGUIR:
[... más reglas estáticas ...]
""" # Este prompt largo se cacheará
async def good_agent(task: str, user_id: str):
options = ClaudeCodeOptions(
system_prompt=STATIC_SYSTEM_PROMPT, # Siempre igual = siempre cacheado
max_turns=20,
)
# La info dinámica va en el prompt, no en el system_prompt
full_task = f"[Usuario: {user_id}]\n\nTarea: {task}"
async for message in query(prompt=full_task, options=options):
if hasattr(message, 'result'):
print(message.result)
Ejemplo completo: agente con system_prompt cacheado
Python:
from claude_code_sdk import query, ClaudeCodeOptions
import os
# El system_prompt se define una sola vez y nunca cambia
CODE_REVIEW_SYSTEM_PROMPT = """
Eres un experto revisor de código con 20 años de experiencia en Python, TypeScript y sistemas distribuidos.
## PRINCIPIOS QUE APLICAS
### Seguridad
- Detectas inyecciones SQL, XSS, CSRF, y otros vectores de ataque OWASP Top 10
- Identificas hardcoded secrets, tokens en código fuente
- Revisas validación de inputs y sanitización de outputs
- Detectas problemas de autorización y autenticación
### Performance
- Identifies N+1 queries en ORMs
- Detectas loops innecesarios y complejidad algorítmica subóptima
- Señalas memoria leaks evidentes
- Revisas uso ineficiente de recursos
### Mantenibilidad
- Aplicas principios SOLID: SRP, OCP, LSP, ISP, DIP
- Detectas código duplicado (DRY violations)
- Evalúas complejidad ciclomática
- Revisas naming conventions y documentación
### Patrones de arquitectura
- Detectas violaciones de arquitectura hexagonal
- Identificas acoplamiento excesivo entre módulos
- Señalas dependencias circulares
- Revisas separación de responsabilidades
## FORMATO DE RESPUESTA
Para cada problema encontrado:
1. **Severidad**: CRITICAL / HIGH / MEDIUM / LOW / INFO
2. **Categoría**: Security / Performance / Maintainability / Architecture
3. **Descripción**: Qué está mal y por qué es un problema
4. **Línea**: Dónde está el problema (si aplica)
5. **Sugerencia**: Cómo corregirlo
Al final, un resumen con:
- Total de issues por severidad
- Puntuación general (0-100)
- Top 3 prioridades
## ESTILO
- Sé directo y específico, sin rodeos
- Incluye ejemplos de código corregido cuando sea útil
- Si el código está bien, dilo claramente
"""
async def cached_code_review(file_path: str, specific_concern: str = "") -> str:
"""
Revisa código usando prompt caching.
El system_prompt largo se cacheará después de la primera llamada.
"""
options = ClaudeCodeOptions(
system_prompt=CODE_REVIEW_SYSTEM_PROMPT, # Siempre idéntico = cacheado
allowed_tools=["View", "GlobTool", "GrepTool"],
max_turns=5,
model="claude-sonnet-4-5",
)
prompt = f"Revisa el código en {file_path}."
if specific_concern:
prompt += f"\n\nPresta especial atención a: {specific_concern}"
result = ""
async for message in query(prompt=prompt, options=options):
if hasattr(message, 'result'):
result = message.result
return result
# Uso: la primera llamada paga el costo completo de tokenización del system_prompt
# Las llamadas siguientes usan el caché (mucho más barato)
async def review_multiple_files(file_paths: list[str]) -> dict[str, str]:
results = {}
for path in file_paths:
# Cada llamada usa el mismo system_prompt cacheado
results[path] = await cached_code_review(path)
print(f"Revisado: {path}")
return results
TypeScript:
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
const CODE_REVIEW_SYSTEM_PROMPT = `
Eres un experto revisor de código con 20 años de experiencia.
## PRINCIPIOS QUE APLICAS
### Seguridad
- Detectas inyecciones SQL, XSS, CSRF y otros vectores OWASP Top 10
- Identificas hardcoded secrets, tokens en código fuente
- Revisas validación de inputs
### Performance
- Identificas N+1 queries en ORMs
- Detectas loops innecesarios y complejidad algorítmica subóptima
- Señalas memoria leaks evidentes
### Mantenibilidad
- Aplicas principios SOLID
- Detectas código duplicado (DRY violations)
- Evalúas complejidad ciclomática
## FORMATO DE RESPUESTA
Para cada problema: Severidad | Categoría | Descripción | Línea | Sugerencia
Al final: resumen con puntuación 0-100 y top 3 prioridades
`.trim();
async function cachedCodeReview(filePath: string, specificConcern?: string): Promise<string> {
const options: ClaudeCodeOptions = {
systemPrompt: CODE_REVIEW_SYSTEM_PROMPT, // Siempre idéntico = cacheado
allowedTools: ["View", "GlobTool", "GrepTool"],
maxTurns: 5,
model: "claude-sonnet-4-5",
};
let prompt = `Revisa el código en ${filePath}.`;
if (specificConcern) {
prompt += `\n\nPresta especial atención a: ${specificConcern}`;
}
let result = "";
for await (const message of query({ prompt, options })) {
if (message.type === "result") {
result = message.result;
}
}
return result;
}
async function reviewMultipleFiles(filePaths: string[]): Promise<Record<string, string>> {
const results: Record<string, string> = {};
for (const path of filePaths) {
results[path] = await cachedCodeReview(path);
console.log(`Revisado: ${path}`);
}
return results;
}
Midiendo el ahorro del caché
async def measure_cache_savings(task: str, options: ClaudeCodeOptions, repetitions: int = 5):
"""Mide el ahorro del caché en múltiples repeticiones."""
costs = []
for i in range(repetitions):
total_cost = 0.0
async for message in query(prompt=task, options=options):
if hasattr(message, 'cost_usd') and message.cost_usd:
total_cost += message.cost_usd
costs.append(total_cost)
print(f"Repetición {i+1}: ${total_cost:.6f}")
print(f"\nPrimera llamada (sin caché): ${costs[0]:.6f}")
print(f"Promedio con caché: ${sum(costs[1:]) / len(costs[1:]):.6f}")
print(f"Ahorro: {(1 - sum(costs[1:]) / len(costs[1:]) / costs[0]) * 100:.1f}%")
4. Streaming para Latencia Percibida
Por qué streaming mejora la UX
Sin streaming, el usuario ve una pantalla en blanco hasta que el agente termina. Con streaming, ve el output aparecer progresivamente. Para tareas de 30 segundos, esto hace una diferencia enorme en la percepción.
Python - streaming a terminal:
from claude_code_sdk import query, ClaudeCodeOptions
import sys
async def streaming_agent(task: str) -> str:
options = ClaudeCodeOptions(
allowed_tools=["View", "GlobTool"],
max_turns=10,
)
full_result = ""
print("Agente: ", end="", flush=True)
async for message in query(prompt=task, options=options):
# Mostrar progreso mientras el agente trabaja
if hasattr(message, 'type'):
if message.type == "assistant":
# Mostrar el texto a medida que llega
if hasattr(message, 'message') and hasattr(message.message, 'content'):
for block in message.message.content:
if hasattr(block, 'text'):
print(block.text, end="", flush=True)
elif message.type == "tool_use":
# Notificar al usuario qué herramienta se está usando
tool_name = getattr(message, 'tool_name', 'unknown')
print(f"\n[Usando: {tool_name}] ", end="", flush=True)
elif message.type == "result":
full_result = message.result
print() # Nueva línea al final
return full_result
TypeScript - streaming a SSE:
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
import { Response } from "express";
async function streamAgentToSSE(task: string, res: Response): Promise<void> {
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
const options: ClaudeCodeOptions = {
allowedTools: ["View", "GlobTool"],
maxTurns: 10,
};
for await (const message of query({ prompt: task, options })) {
if (message.type === "assistant") {
const content = (message as any).message?.content ?? [];
for (const block of content) {
if (block.type === "text") {
res.write(`data: ${JSON.stringify({ type: "text", text: block.text })}\n\n`);
}
}
} else if (message.type === "result") {
res.write(`data: ${JSON.stringify({ type: "done", result: message.result })}\n\n`);
}
}
res.end();
}
Cuándo el streaming NO ayuda
El streaming no mejora el TTC (tiempo total). Solo mejora la latencia percibida. No usar streaming para:
- Batch jobs que procesan cientos de archivos (el output no se muestra)
- Pipelines donde el resultado del agente se pasa a otro sistema
- Tests automatizados
5. Paralelismo y Concurrencia
El problema con queries secuenciales
sequenceDiagram
participant C as Cliente
participant A1 as Agente 1
participant A2 as Agente 2
participant A3 as Agente 3
Note over C,A3: Secuencial: 30 segundos total
C->>A1: Query 1
A1-->>C: Respuesta 1 (10s)
C->>A2: Query 2
A2-->>C: Respuesta 2 (10s)
C->>A3: Query 3
A3-->>C: Respuesta 3 (10s)
Note over C,A3: Paralelo: ~10 segundos total
C->>A1: Query 1
C->>A2: Query 2
C->>A3: Query 3
A1-->>C: Respuesta 1
A2-->>C: Respuesta 2
A3-->>C: Respuesta 3
asyncio.gather para múltiples queries
Python:
from claude_code_sdk import query, ClaudeCodeOptions
import asyncio
from typing import Any
async def single_agent_task(task: str, task_id: str) -> tuple[str, str]:
"""Ejecuta una tarea individual."""
options = ClaudeCodeOptions(
allowed_tools=["View", "GlobTool", "GrepTool"],
max_turns=10,
model="claude-haiku-3-5", # Haiku para tareas simples paralelas
)
result = ""
async for message in query(prompt=task, options=options):
if hasattr(message, 'result'):
result = message.result
return task_id, result
async def parallel_agent_queries(tasks: list[tuple[str, str]]) -> dict[str, str]:
"""
Ejecuta múltiples queries en paralelo.
tasks: lista de (task_id, task_prompt)
"""
coroutines = [single_agent_task(task, task_id) for task_id, task in tasks]
results = await asyncio.gather(*coroutines, return_exceptions=True)
output = {}
for result in results:
if isinstance(result, Exception):
print(f"Error en una tarea: {result}")
else:
task_id, task_result = result
output[task_id] = task_result
return output
# Ejemplo: analizar múltiples archivos en paralelo
async def analyze_files_parallel(file_paths: list[str]) -> dict[str, str]:
tasks = [
(path, f"Analiza {path} y lista los problemas de seguridad")
for path in file_paths
]
return await parallel_agent_queries(tasks)
Semaphore para limitar concurrencia (rate limit)
La API de Anthropic tiene límites de requests por minuto. Un semaphore evita sobrepasarlos:
Python:
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
async def process_with_concurrency_limit(
tasks: list[str],
max_concurrent: int = 5,
workspace: str = "/workspace"
) -> list[str]:
"""
Procesa tareas en paralelo con límite de concurrencia.
max_concurrent: máximo de queries simultáneos a la API.
"""
semaphore = asyncio.Semaphore(max_concurrent)
async def bounded_task(task: str) -> str:
async with semaphore: # Solo `max_concurrent` tareas corren a la vez
options = ClaudeCodeOptions(
cwd=workspace,
allowed_tools=["View", "GlobTool"],
max_turns=5,
model="claude-haiku-3-5",
)
result = ""
async for message in query(prompt=task, options=options):
if hasattr(message, 'result'):
result = message.result
return result
return await asyncio.gather(*[bounded_task(task) for task in tasks])
# Uso: procesar 100 archivos con máximo 5 en paralelo
async def main():
files = [f"archivo_{i}.py" for i in range(100)]
tasks = [f"Revisa {f} por vulnerabilidades de seguridad" for f in files]
results = await process_with_concurrency_limit(tasks, max_concurrent=5)
print(f"Procesados {len(results)} archivos")
Ejemplo: procesar 100 archivos en paralelo
Python completo:
from claude_code_sdk import query, ClaudeCodeOptions
import asyncio
import time
from pathlib import Path
async def analyze_single_file(
file_path: str,
semaphore: asyncio.Semaphore,
results: dict,
errors: dict
) -> None:
"""Analiza un archivo individual con control de concurrencia."""
async with semaphore:
try:
options = ClaudeCodeOptions(
allowed_tools=["View"],
max_turns=3,
model="claude-haiku-3-5", # Rápido y barato para análisis simple
)
file_result = ""
async for message in query(
prompt=f"Lista los imports y exportaciones principales de {file_path}",
options=options
):
if hasattr(message, 'result'):
file_result = message.result
results[file_path] = file_result
except Exception as e:
errors[file_path] = str(e)
async def analyze_codebase(
directory: str,
pattern: str = "**/*.py",
max_concurrent: int = 10
) -> dict[str, str]:
"""Analiza todos los archivos que coincidan con el patrón en paralelo."""
files = list(Path(directory).glob(pattern))
print(f"Analizando {len(files)} archivos con {max_concurrent} workers...")
semaphore = asyncio.Semaphore(max_concurrent)
results: dict[str, str] = {}
errors: dict[str, str] = {}
start_time = time.time()
tasks = [
analyze_single_file(str(f), semaphore, results, errors)
for f in files
]
await asyncio.gather(*tasks)
elapsed = time.time() - start_time
print(f"Completado en {elapsed:.1f}s")
print(f"Exitosos: {len(results)}, Errores: {len(errors)}")
return results
TypeScript: Promise.all con límite de concurrencia
TypeScript:
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
async function processBatch<T>(
items: T[],
processor: (item: T) => Promise<string>,
maxConcurrent: number
): Promise<Array<{ item: T; result?: string; error?: string }>> {
const results: Array<{ item: T; result?: string; error?: string }> = [];
// Dividir en chunks del tamaño de maxConcurrent
for (let i = 0; i < items.length; i += maxConcurrent) {
const chunk = items.slice(i, i + maxConcurrent);
const chunkResults = await Promise.allSettled(
chunk.map((item) => processor(item))
);
for (let j = 0; j < chunk.length; j++) {
const outcome = chunkResults[j];
if (outcome.status === "fulfilled") {
results.push({ item: chunk[j], result: outcome.value });
} else {
results.push({ item: chunk[j], error: outcome.reason?.message });
}
}
console.log(`Procesados ${Math.min(i + maxConcurrent, items.length)}/${items.length}`);
}
return results;
}
async function analyzeFiles(filePaths: string[]): Promise<void> {
const analyzeFile = async (filePath: string): Promise<string> => {
const options: ClaudeCodeOptions = {
allowedTools: ["View"],
maxTurns: 3,
model: "claude-haiku-3-5",
};
let result = "";
for await (const message of query({
prompt: `Analiza brevemente ${filePath}`,
options,
})) {
if (message.type === "result") result = message.result;
}
return result;
};
const results = await processBatch(filePaths, analyzeFile, 5);
console.log(`Analizados: ${results.filter((r) => r.result).length} archivos`);
}
Patrón producer/consumer
Para pipelines complejos donde el análisis alimenta acciones:
Python:
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
async def producer(file_queue: asyncio.Queue, files: list[str]):
"""Produce archivos para analizar."""
for file in files:
await file_queue.put(file)
# Señal de fin
for _ in range(5): # Un "poison pill" por worker
await file_queue.put(None)
async def consumer(
worker_id: int,
file_queue: asyncio.Queue,
result_queue: asyncio.Queue
):
"""Consume archivos, los analiza y pone resultados en result_queue."""
while True:
file = await file_queue.get()
if file is None:
break # Poison pill: terminar
options = ClaudeCodeOptions(
allowed_tools=["View"],
max_turns=3,
model="claude-haiku-3-5",
)
result = ""
async for message in query(
prompt=f"¿Tiene este archivo tests? Responde solo: SÍ o NO. Archivo: {file}",
options=options
):
if hasattr(message, 'result'):
result = message.result
await result_queue.put({"file": file, "result": result, "worker": worker_id})
file_queue.task_done()
async def producer_consumer_pipeline(files: list[str], num_workers: int = 5):
file_queue: asyncio.Queue = asyncio.Queue(maxsize=20)
result_queue: asyncio.Queue = asyncio.Queue()
# Iniciar producer y consumers en paralelo
producers = [asyncio.create_task(producer(file_queue, files))]
consumers = [
asyncio.create_task(consumer(i, file_queue, result_queue))
for i in range(num_workers)
]
await asyncio.gather(*producers)
await asyncio.gather(*consumers)
# Recolectar todos los resultados
results = []
while not result_queue.empty():
results.append(await result_queue.get())
return results
6. Optimización de Herramientas
Jerarquía de herramientas por costo de tokens
graph TD
A[Glob - buscar archivos] -->|"Muy barato: lista de paths"| B
B[Grep - buscar contenido] -->|"Barato: solo líneas que coinciden"| C
C[View con range - leer sección] -->|"Moderado: solo parte del archivo"| D
D[View completo - leer archivo] -->|"Costoso: todo el contenido"| E
E[Múltiples Reads] -->|"Muy costoso: n × contenido"| F
F[Bash - ejecutar y leer output] -->|"El más costoso y poderoso"| G[Resultado]
Grep antes de Read
Anti-patrón:
# MAL: Leer 50 archivos para encontrar la función
options = ClaudeCodeOptions(
system_prompt="Encuentra la función 'process_payment' y explica cómo funciona.",
allowed_tools=["View", "GlobTool"],
)
# El agente probablemente leerá muchos archivos antes de encontrar la función
Optimizado:
# BIEN: Guiar al agente para usar Grep primero
options = ClaudeCodeOptions(
system_prompt="""Cuando necesites encontrar código:
1. PRIMERO usa GrepTool para encontrar el archivo que contiene el código
2. LUEGO usa View con un rango de líneas para leer solo la parte relevante
3. NUNCA leas archivos completos si puedes usar Grep primero
""",
allowed_tools=["View", "GlobTool", "GrepTool"],
)
Profiling de uso de herramientas
from collections import defaultdict
from claude_code_sdk import query, ClaudeCodeOptions
class ToolProfiler:
def __init__(self):
self.tool_calls = defaultdict(int)
self.total_queries = 0
def record(self, messages: list):
self.total_queries += 1
for message in messages:
if hasattr(message, 'type') and message.type == "tool_use":
tool_name = getattr(message, 'tool_name', 'unknown')
self.tool_calls[tool_name] += 1
def report(self):
print(f"\n=== Tool Profiling Report ({self.total_queries} queries) ===")
for tool, count in sorted(self.tool_calls.items(), key=lambda x: -x[1]):
avg = count / self.total_queries
print(f" {tool}: {count} calls ({avg:.1f} avg/query)")
profiler = ToolProfiler()
async def profiled_agent(task: str) -> str:
options = ClaudeCodeOptions(
allowed_tools=["View", "GlobTool", "GrepTool", "Edit"],
max_turns=20,
)
all_messages = []
result = ""
async for message in query(prompt=task, options=options):
all_messages.append(message)
if hasattr(message, 'result'):
result = message.result
profiler.record(all_messages)
return result
Combinar consultas en una sola tool call
Ineficiente:
Turn 1: ¿Qué archivos hay en /src?
Turn 2: Lee /src/main.py
Turn 3: Lee /src/utils.py
Turn 4: Busca "import" en todos los archivos
Eficiente (guiar con el system_prompt):
options = ClaudeCodeOptions(
system_prompt="""Optimiza tu uso de herramientas:
- Usa GlobTool para listar todos los archivos que necesitas DE UNA VEZ
- Usa GrepTool para encontrar patrones en MÚLTIPLES archivos simultáneamente
- Lee solo las secciones relevantes, no archivos completos
- Planifica antes de ejecutar: determina qué archivos necesitas antes de leerlos
""",
allowed_tools=["View", "GlobTool", "GrepTool"],
max_turns=10, # Menos turns necesarios con uso eficiente de herramientas
)
7. Caching de Resultados
Cache layer sobre query()
Para queries idénticas (mismo task + mismo contexto), cachear el resultado evita llamadas repetidas a la API:
Python:
import hashlib
import json
import time
from claude_code_sdk import query, ClaudeCodeOptions
class AgentResultCache:
"""Cache en memoria para resultados de queries."""
def __init__(self, ttl_seconds: int = 3600):
self._cache: dict[str, tuple[str, float]] = {}
self.ttl_seconds = ttl_seconds
self.hits = 0
self.misses = 0
def _make_key(self, prompt: str, options: ClaudeCodeOptions) -> str:
"""Genera una clave única basada en el prompt y opciones relevantes."""
key_data = {
"prompt": prompt,
"model": getattr(options, 'model', 'default'),
"allowed_tools": sorted(getattr(options, 'allowed_tools', [])),
"system_prompt": getattr(options, 'system_prompt', ''),
}
key_str = json.dumps(key_data, sort_keys=True)
return hashlib.sha256(key_str.encode()).hexdigest()
def get(self, key: str) -> str | None:
if key in self._cache:
result, timestamp = self._cache[key]
if time.time() - timestamp < self.ttl_seconds:
self.hits += 1
return result
else:
del self._cache[key] # Expirado
self.misses += 1
return None
def set(self, key: str, result: str):
self._cache[key] = (result, time.time())
def stats(self) -> str:
total = self.hits + self.misses
hit_rate = (self.hits / total * 100) if total > 0 else 0
return f"Cache: {self.hits} hits, {self.misses} misses ({hit_rate:.1f}% hit rate)"
# Instancia global del caché
_cache = AgentResultCache(ttl_seconds=3600)
async def cached_query(prompt: str, options: ClaudeCodeOptions) -> str:
"""Wrapper de query() con caché de resultados."""
cache_key = _cache._make_key(prompt, options)
# Verificar caché primero
cached_result = _cache.get(cache_key)
if cached_result is not None:
print("[CACHE HIT]")
return cached_result
# Ejecutar query si no hay caché
print("[CACHE MISS] Ejecutando query...")
result = ""
async for message in query(prompt=prompt, options=options):
if hasattr(message, 'result'):
result = message.result
# Guardar en caché
_cache.set(cache_key, result)
return result
Redis para cache distribuida
import redis.asyncio as aioredis
import hashlib
import json
from claude_code_sdk import query, ClaudeCodeOptions
class RedisAgentCache:
def __init__(self, redis_url: str = "redis://localhost:6379", ttl: int = 3600):
self.redis = aioredis.from_url(redis_url)
self.ttl = ttl
def _make_key(self, prompt: str, model: str) -> str:
data = json.dumps({"prompt": prompt, "model": model}, sort_keys=True)
return f"agent:cache:{hashlib.sha256(data.encode()).hexdigest()}"
async def get(self, prompt: str, model: str) -> str | None:
key = self._make_key(prompt, model)
value = await self.redis.get(key)
return value.decode() if value else None
async def set(self, prompt: str, model: str, result: str):
key = self._make_key(prompt, model)
await self.redis.setex(key, self.ttl, result.encode())
async def invalidate_by_file(self, file_path: str):
"""Invalida todas las entradas de caché relacionadas con un archivo."""
# Implementación: buscar keys con pattern relacionado al archivo
pattern = f"agent:cache:*"
async for key in self.redis.scan_iter(pattern):
# En una implementación real, guardarías metadata del archivo
pass
redis_cache = RedisAgentCache()
async def redis_cached_query(prompt: str, options: ClaudeCodeOptions) -> str:
model = getattr(options, 'model', 'claude-sonnet-4-5')
cached = await redis_cache.get(prompt, model)
if cached:
return cached
result = ""
async for message in query(prompt=prompt, options=options):
if hasattr(message, 'result'):
result = message.result
await redis_cache.set(prompt, model, result)
return result
TypeScript:
import { createClient } from "redis";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
import { createHash } from "crypto";
const redisClient = createClient({ url: "redis://localhost:6379" });
function makeCacheKey(prompt: string, model: string): string {
const data = JSON.stringify({ prompt, model });
return `agent:cache:${createHash("sha256").update(data).digest("hex")}`;
}
async function redisCachedQuery(prompt: string, options: ClaudeCodeOptions): Promise<string> {
const model = options.model ?? "claude-sonnet-4-5";
const key = makeCacheKey(prompt, model);
const cached = await redisClient.get(key);
if (cached) {
console.log("[CACHE HIT]");
return cached;
}
let result = "";
for await (const message of query({ prompt, options })) {
if (message.type === "result") {
result = message.result;
}
}
await redisClient.setEx(key, 3600, result);
return result;
}
8. Batch Processing
Agrupar múltiples tareas pequeñas en una query
En lugar de hacer N queries para N archivos, a veces es más eficiente hacer 1 query con N archivos:
Python:
from claude_code_sdk import query, ClaudeCodeOptions
import json
async def batch_classify_files(file_paths: list[str], workspace: str) -> dict[str, str]:
"""
Clasifica múltiples archivos en una sola query (batch).
Más eficiente que N queries individuales cuando los archivos son pequeños.
"""
# Leer los archivos localmente para incluirlos en el prompt
files_content = {}
for path in file_paths[:10]: # Limitar para no exceder context window
try:
with open(f"{workspace}/{path}") as f:
content = f.read()[:500] # Solo primeros 500 chars
files_content[path] = content
except Exception:
pass
files_json = json.dumps(files_content, indent=2)
options = ClaudeCodeOptions(
allowed_tools=[], # Sin herramientas - tenemos el contenido en el prompt
max_turns=3,
model="claude-haiku-3-5",
)
prompt = f"""Para cada archivo a continuación, clasifícalo como uno de:
- "config" (archivos de configuración)
- "test" (archivos de prueba)
- "business_logic" (lógica de negocio)
- "utility" (utilidades/helpers)
- "ui" (componentes de interfaz)
Responde en JSON: {{"archivo.py": "tipo", ...}}
ARCHIVOS:
{files_json}
"""
result = ""
async for message in query(prompt=prompt, options=options):
if hasattr(message, 'result'):
result = message.result
try:
return json.loads(result)
except json.JSONDecodeError:
return {}
# Comparación de enfoque
# Batch (1 query): $0.002, 3 segundos para 10 archivos
# Individual (10 queries): $0.010, 30 segundos para 10 archivos
Cuándo batching es más económico vs. paralelo
graph TD
A[¿Cuántos archivos?] -->|"< 10 archivos pequeños"| B[Batch: 1 query]
A -->|"10-100 archivos"| C[Paralelo con semaphore]
A -->|"> 100 archivos"| D[Producer/Consumer Pipeline]
B -->|"Pro: 1 sola query\nCon: context window limitado"| E[Resultado]
C -->|"Pro: todos en paralelo\nCon: N queries"| E
D -->|"Pro: escalable\nCon: más complejidad"| E
Ejemplo completo: procesar 1000 archivos eficientemente
import asyncio
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions
async def process_1000_files_efficiently(directory: str) -> dict[str, str]:
"""
Estrategia óptima para procesar 1000 archivos:
1. Agrupamos en batches de 10 (1 query por batch)
2. Ejecutamos 10 batches en paralelo
3. Total: 100 queries en paralelo, cada una con 10 archivos
"""
all_files = list(Path(directory).glob("**/*.py"))
print(f"Total archivos: {len(all_files)}")
BATCH_SIZE = 10
MAX_CONCURRENT_BATCHES = 10
# Dividir en batches
batches = [
all_files[i:i + BATCH_SIZE]
for i in range(0, len(all_files), BATCH_SIZE)
]
semaphore = asyncio.Semaphore(MAX_CONCURRENT_BATCHES)
async def process_batch(batch: list[Path]) -> dict[str, str]:
async with semaphore:
batch_content = {}
for f in batch:
try:
content = f.read_text()[:300]
batch_content[str(f.name)] = content
except Exception:
pass
options = ClaudeCodeOptions(
allowed_tools=[],
max_turns=2,
model="claude-haiku-3-5",
)
prompt = f"""Clasifica brevemente cada archivo (una línea por archivo):
{', '.join(batch_content.keys())}
Responde: nombre_archivo: descripción_una_linea"""
result = ""
async for message in query(prompt=prompt, options=options):
if hasattr(message, 'result'):
result = message.result
# Parsear resultado simple
classifications = {}
for line in result.strip().split('\n'):
if ':' in line:
parts = line.split(':', 1)
if len(parts) == 2:
classifications[parts[0].strip()] = parts[1].strip()
return classifications
results: dict[str, str] = {}
batch_results = await asyncio.gather(*[process_batch(b) for b in batches])
for batch_result in batch_results:
results.update(batch_result)
print(f"Procesados {len(results)} archivos")
return results
9. Optimización de Tokens
Reducir el system_prompt a lo esencial
Cada token en el system_prompt se paga en cada query. Un system_prompt de 2000 tokens vs. uno de 200 tokens puede significar 10x más gasto:
# Versión no optimizada: 2000 tokens
BAD_SYSTEM_PROMPT = """
Eres un asistente de programación muy experimentado que ha trabajado durante muchos años
en la industria del software. Tienes experiencia en Python, JavaScript, TypeScript, Go,
Rust, Java, C++, y muchos otros lenguajes. Siempre escribes código limpio y bien documentado.
Sigues las mejores prácticas de la industria. Aplicas los principios SOLID. Escribes tests.
[... 1800 tokens más de instrucciones redundantes ...]
"""
# Versión optimizada: 200 tokens
GOOD_SYSTEM_PROMPT = """
Eres un revisor de código experto. Prioridades:
1. Seguridad (vulnerabilidades OWASP Top 10)
2. Principios SOLID
3. Complejidad O(n)
Responde: problema | severidad | línea | fix sugerido
"""
Comprimir outputs largos de herramientas
Si una herramienta retorna 10,000 tokens pero solo necesitas 500, estás pagando por tokens innecesarios en el contexto:
options = ClaudeCodeOptions(
system_prompt="""Cuando uses herramientas:
- Al leer archivos, especifica el rango de líneas relevantes (no leas archivos completos)
- Al usar Grep, usa patrones específicos para obtener pocas coincidencias
- Si un archivo es largo, lee solo las funciones relevantes
- Nunca leas más de 200 líneas consecutivas a la vez
""",
)
Resumen periódico en sesiones largas
En sessions con muchos turns, el contexto crece y se vuelve costoso. Estrategia: resumir el progreso periódicamente.
Python:
from claude_code_sdk import query, ClaudeCodeOptions
async def long_session_with_summarization(initial_task: str, workspace: str) -> str:
"""
Para tareas largas, divide en subtareas y resume el progreso.
Evita contextos enormes que consumen muchos tokens.
"""
# Fase 1: Planificar
plan_options = ClaudeCodeOptions(
cwd=workspace,
allowed_tools=["View", "GlobTool"],
max_turns=5,
model="claude-haiku-3-5",
)
plan = ""
async for message in query(
prompt=f"Crea un plan de 5 pasos para: {initial_task}. Solo el plan, sin ejecutar.",
options=plan_options
):
if hasattr(message, 'result'):
plan = message.result
# Fase 2: Ejecutar paso a paso (cada fase con contexto fresco)
progress = []
steps = [line for line in plan.split('\n') if line.strip()][:5]
for step in steps:
step_context = "\n".join([
f"Tarea original: {initial_task}",
f"Plan:\n{plan}",
f"Progreso hasta ahora:\n{chr(10).join(progress[-2:])}", # Solo últimos 2 pasos
f"Paso actual a ejecutar: {step}",
])
step_options = ClaudeCodeOptions(
cwd=workspace,
allowed_tools=["View", "GlobTool", "GrepTool", "Edit"],
max_turns=10,
)
step_result = ""
async for message in query(prompt=step_context, options=step_options):
if hasattr(message, 'result'):
step_result = message.result
# Resumir el resultado del paso (no acumular el texto completo)
progress.append(f"✓ {step[:50]}: {step_result[:100]}")
return "\n".join(progress)
Estimación de tokens antes de ejecutar
def estimate_tokens(text: str) -> int:
"""Estimación rápida: ~4 caracteres por token en inglés, ~2-3 en español."""
return len(text) // 3 # Conservador para español
def will_exceed_context(prompt: str, system_prompt: str, max_context: int = 100_000) -> bool:
estimated = estimate_tokens(prompt) + estimate_tokens(system_prompt)
return estimated > max_context * 0.8 # 80% del límite como buffer
# Uso
if will_exceed_context(my_prompt, my_system_prompt):
# Dividir la tarea en partes más pequeñas
print("La tarea es demasiado grande, dividiendo...")
10. Monitoreo de Costos en Producción
Dashboard de costos por agente y usuario
Python con Prometheus:
from prometheus_client import Counter, Histogram, start_http_server
from claude_code_sdk import query, ClaudeCodeOptions
# Métricas de Prometheus
agent_queries_total = Counter(
'agent_queries_total',
'Total de queries al agente',
['agent_name', 'user_id', 'model', 'status']
)
agent_cost_total = Counter(
'agent_cost_usd_total',
'Costo total en USD',
['agent_name', 'user_id', 'model']
)
agent_duration_seconds = Histogram(
'agent_duration_seconds',
'Duración de queries en segundos',
['agent_name', 'model'],
buckets=[1, 5, 10, 30, 60, 120, 300]
)
class MonitoredAgent:
def __init__(self, agent_name: str):
self.agent_name = agent_name
async def run(self, user_id: str, task: str, options: ClaudeCodeOptions) -> str:
import time
model = getattr(options, 'model', 'unknown')
start_time = time.time()
status = "success"
try:
result = ""
total_cost = 0.0
async for message in query(prompt=task, options=options):
if hasattr(message, 'cost_usd') and message.cost_usd:
total_cost += message.cost_usd
if hasattr(message, 'result'):
result = message.result
# Registrar costo
agent_cost_total.labels(
agent_name=self.agent_name,
user_id=user_id,
model=model
).inc(total_cost)
return result
except Exception as e:
status = "error"
raise
finally:
duration = time.time() - start_time
agent_queries_total.labels(
agent_name=self.agent_name,
user_id=user_id,
model=model,
status=status
).inc()
agent_duration_seconds.labels(
agent_name=self.agent_name,
model=model
).observe(duration)
# Iniciar servidor de métricas en puerto 9090
start_http_server(9090)
# Uso
code_reviewer = MonitoredAgent("code_reviewer")
result = await code_reviewer.run(
user_id="user123",
task="Revisa src/main.py",
options=ClaudeCodeOptions(allowed_tools=["View"], max_turns=5)
)
Alertas cuando el costo supera umbral
Python con alertas por email:
import smtplib
from email.mime.text import MIMEText
import os
DAILY_COST_ALERT_THRESHOLD_USD = 50.0
async def check_and_alert_costs(user_id: str, current_daily_cost: float):
"""Envía alerta si el costo diario supera el umbral."""
if current_daily_cost > DAILY_COST_ALERT_THRESHOLD_USD:
subject = f"[ALERTA] Costo de agente alto: ${current_daily_cost:.2f}"
body = f"""
El usuario {user_id} ha superado el umbral de costo diario.
Costo actual del día: ${current_daily_cost:.2f}
Umbral configurado: ${DAILY_COST_ALERT_THRESHOLD_USD:.2f}
Acciones posibles:
- Revisar si hay queries ineficientes
- Verificar si hay uso no autorizado
- Considerar aumentar el umbral si el uso es legítimo
"""
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = os.environ["ALERT_EMAIL_FROM"]
msg['To'] = os.environ["ALERT_EMAIL_TO"]
with smtplib.SMTP(os.environ["SMTP_HOST"]) as server:
server.send_message(msg)
print(f"[ALERT] Email enviado: costo ${current_daily_cost:.2f} para {user_id}")
Cost tracker completo
Python:
import json
from datetime import datetime, timezone
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions
class CostTracker:
def __init__(self, log_path: str):
self.log_path = Path(log_path)
self.log_path.parent.mkdir(parents=True, exist_ok=True)
def record(self, user_id: str, agent_name: str, model: str, cost_usd: float, turns: int):
entry = {
"timestamp": datetime.now(tz=timezone.utc).isoformat(),
"user_id": user_id,
"agent_name": agent_name,
"model": model,
"cost_usd": cost_usd,
"turns": turns,
"date": datetime.now().strftime("%Y-%m-%d"),
}
with open(self.log_path, "a") as f:
f.write(json.dumps(entry) + "\n")
def daily_report(self, date: str | None = None) -> dict:
target_date = date or datetime.now().strftime("%Y-%m-%d")
entries = []
if self.log_path.exists():
with open(self.log_path) as f:
for line in f:
entry = json.loads(line)
if entry["date"] == target_date:
entries.append(entry)
# Agrupar por agente
by_agent: dict[str, dict] = {}
for entry in entries:
agent = entry["agent_name"]
if agent not in by_agent:
by_agent[agent] = {"total_cost": 0.0, "queries": 0}
by_agent[agent]["total_cost"] += entry["cost_usd"]
by_agent[agent]["queries"] += 1
total = sum(e["cost_usd"] for e in entries)
return {
"date": target_date,
"total_cost_usd": total,
"total_queries": len(entries),
"by_agent": by_agent,
}
def monthly_report(self, year: int, month: int) -> dict:
prefix = f"{year}-{month:02d}"
entries = []
if self.log_path.exists():
with open(self.log_path) as f:
for line in f:
entry = json.loads(line)
if entry["date"].startswith(prefix):
entries.append(entry)
total = sum(e["cost_usd"] for e in entries)
by_model: dict[str, float] = {}
for entry in entries:
model = entry["model"]
by_model[model] = by_model.get(model, 0.0) + entry["cost_usd"]
return {
"period": f"{year}-{month:02d}",
"total_cost_usd": total,
"total_queries": len(entries),
"by_model": by_model,
}
# Uso
tracker = CostTracker("/var/log/agent/costs.jsonl")
async def tracked_agent(user_id: str, agent_name: str, task: str) -> str:
options = ClaudeCodeOptions(
allowed_tools=["View", "GlobTool"],
max_turns=10,
model="claude-haiku-3-5",
)
total_cost = 0.0
total_turns = 0
result = ""
async for message in query(prompt=task, options=options):
if hasattr(message, 'cost_usd') and message.cost_usd:
total_cost += message.cost_usd
if hasattr(message, 'num_turns'):
total_turns = message.num_turns
if hasattr(message, 'result'):
result = message.result
tracker.record(
user_id=user_id,
agent_name=agent_name,
model="claude-haiku-3-5",
cost_usd=total_cost,
turns=total_turns,
)
# Verificar alerta de costo
daily = tracker.daily_report()
if daily["total_cost_usd"] > DAILY_COST_ALERT_THRESHOLD_USD:
await check_and_alert_costs(user_id, daily["total_cost_usd"])
return result
# Reporte mensual automatizado
async def generate_monthly_report():
now = datetime.now()
report = tracker.monthly_report(now.year, now.month)
print(f"\n=== Reporte {report['period']} ===")
print(f"Costo total: ${report['total_cost_usd']:.4f}")
print(f"Queries totales: {report['total_queries']}")
print("Por modelo:")
for model, cost in sorted(report['by_model'].items(), key=lambda x: -x[1]):
print(f" {model}: ${cost:.4f}")
Resumen: Checklist de Optimización
graph LR
subgraph I["Impacto Alto"]
I1[Selección correcta de modelo]
I2[Prompt caching activado]
I3[Herramientas mínimas]
end
subgraph M["Impacto Medio"]
M1[Paralelismo con semaphore]
M2[Cache de resultados Redis]
M3[Batch processing]
end
subgraph B["Impacto Bajo pero acumulativo"]
B1[System_prompt compacto]
B2[Grep antes de Read]
B3[Compresión de outputs]
end
I -->|"Implementar primero"| M
M -->|"Luego"| B
| Técnica | Ahorro típico | Esfuerzo |
|---|---|---|
| Modelo correcto | 5-20x costo | Bajo |
| Prompt caching | 50-90% input tokens | Bajo |
| Paralelismo | Tiempo ÷ N workers | Medio |
| Cache de resultados | 100% si hit | Medio |
| System_prompt compacto | 10-30% tokens | Bajo |
| Grep antes de Read | 20-40% turns | Bajo |
| Batch processing | 50-80% queries | Medio |
La optimización más impactante es seleccionar el modelo correcto. Un Haiku en lugar de Opus para tareas simples puede reducir el costo hasta 60x. El segundo impacto mayor es el prompt caching, especialmente si tienes un system_prompt largo que se repite.
El siguiente capítulo cubre cómo integrar estos agentes optimizados en frameworks web como FastAPI, Express y Next.js.