Capítulo 15: Performance y Optimización

Por: Artiko
claudeagent-sdkperformanceoptimizacioncachingcosto

Capítulo 15: Performance y Optimización

Un agente lento o costoso no llega a producción, o si llega, no sobrevive. Este capítulo cubre todas las técnicas para construir agentes que sean rápidos, económicos y escalables — sin sacrificar calidad.

La optimización de agentes tiene tres dimensiones que compiten entre sí:

graph TD
    T[Triángulo de Trade-offs]
    T --> V[Velocidad\ntime-to-complete]
    T --> C[Costo\nUSD por query]
    T --> Q[Calidad\nresultados correctos]

    V <-->|"Modelo rápido = peor calidad"| Q
    C <-->|"Modelo barato = peor calidad"| Q
    V <-->|"Más rápido = más tokens = más costo"| C

La clave es entender cuándo cada dimensión importa más para tu caso de uso específico.


1. Métricas de Performance de Agentes

Las métricas correctas para medir

Antes de optimizar, necesitas medir. Las métricas de un agente son distintas a las de una API REST simple:

Latencia:

Throughput:

Costo:

Cómo medir: instrumentación básica

Python:

from claude_code_sdk import query, ClaudeCodeOptions
import time
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class AgentMetrics:
    start_time: float = field(default_factory=time.time)
    first_token_time: Optional[float] = None
    end_time: Optional[float] = None
    total_cost_usd: float = 0.0
    total_turns: int = 0
    tool_calls: list[str] = field(default_factory=list)

    @property
    def ttft_ms(self) -> Optional[float]:
        if self.first_token_time:
            return (self.first_token_time - self.start_time) * 1000
        return None

    @property
    def ttc_ms(self) -> Optional[float]:
        if self.end_time:
            return (self.end_time - self.start_time) * 1000
        return None

    def report(self) -> str:
        return (
            f"TTFT: {self.ttft_ms:.0f}ms | "
            f"TTC: {self.ttc_ms:.0f}ms | "
            f"Costo: ${self.total_cost_usd:.4f} | "
            f"Turns: {self.total_turns} | "
            f"Tools: {', '.join(self.tool_calls)}"
        )

async def measured_agent(task: str, options: ClaudeCodeOptions) -> tuple[str, AgentMetrics]:
    metrics = AgentMetrics()
    result = ""

    async for message in query(prompt=task, options=options):
        # Detectar primer token (primera respuesta del modelo)
        if metrics.first_token_time is None:
            metrics.first_token_time = time.time()

        # Contabilizar costo
        if hasattr(message, 'cost_usd') and message.cost_usd:
            metrics.total_cost_usd += message.cost_usd

        # Contar turns y herramientas
        if hasattr(message, 'num_turns'):
            metrics.total_turns = message.num_turns

        if hasattr(message, 'result'):
            result = message.result

    metrics.end_time = time.time()
    print(f"[METRICS] {metrics.report()}")

    return result, metrics

TypeScript:

import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

interface AgentMetrics {
  startTime: number;
  firstTokenTime?: number;
  endTime?: number;
  totalCostUsd: number;
  totalTurns: number;
  toolCalls: string[];
}

function createMetrics(): AgentMetrics {
  return {
    startTime: Date.now(),
    totalCostUsd: 0,
    totalTurns: 0,
    toolCalls: [],
  };
}

function reportMetrics(m: AgentMetrics): string {
  const ttft = m.firstTokenTime ? m.firstTokenTime - m.startTime : null;
  const ttc = m.endTime ? m.endTime - m.startTime : null;
  return `TTFT: ${ttft?.toFixed(0) ?? "N/A"}ms | TTC: ${ttc?.toFixed(0) ?? "N/A"}ms | Costo: $${m.totalCostUsd.toFixed(4)}`;
}

async function measuredAgent(task: string, options: ClaudeCodeOptions): Promise<[string, AgentMetrics]> {
  const metrics = createMetrics();
  let result = "";

  for await (const message of query({ prompt: task, options })) {
    if (!metrics.firstTokenTime) {
      metrics.firstTokenTime = Date.now();
    }
    if (message.type === "result") {
      result = message.result;
    }
  }

  metrics.endTime = Date.now();
  console.log(`[METRICS] ${reportMetrics(metrics)}`);

  return [result, metrics];
}

Benchmark: línea de base

Antes de optimizar, establece una línea de base ejecutando la misma tarea 10 veces y midiendo:


2. Selección de Modelo por Tarea

Entendiendo la familia de modelos Claude

La selección de modelo es la optimización de mayor impacto. Un error aquí puede multiplicar por 10x tu costo o reducir a la mitad la calidad.

Claude Opus 4: El más capaz.

Claude Sonnet 4: El punto dulce.

Claude Haiku 3.5: El rápido y barato.

Router de modelos basado en complejidad

Python:

from claude_code_sdk import query, ClaudeCodeOptions
import re

def estimate_task_complexity(task: str) -> str:
    """
    Estima la complejidad de una tarea y retorna el modelo recomendado.
    Retorna: "claude-opus-4-5" | "claude-sonnet-4-5" | "claude-haiku-3-5"
    """
    task_lower = task.lower()
    word_count = len(task.split())

    # Indicadores de alta complejidad -> Opus
    high_complexity_signals = [
        r"\barchitectura\b",
        r"\bdiseño\s+de\s+sistema",
        r"\boptimización\s+de\s+algoritmo",
        r"\brefactorización\s+completa",
        r"\bseguridad\s+(crítica|producción)",
        r"\bmigración",
        r"\banalyze\s+(entire|whole|complete|all)",
        r"\bmultiple\s+files",
        r"\bcomplex\b.*\barchitecture\b",
    ]

    for signal in high_complexity_signals:
        if re.search(signal, task_lower):
            return "claude-opus-4-5"

    # Indicadores de baja complejidad -> Haiku
    low_complexity_signals = [
        r"\bclasifica\b",
        r"\bextrae\b.*\b(campo|dato|valor)\b",
        r"\bformatea\b",
        r"\bresume\b",
        r"\btraduce\b",
        r"\bconvierte\b.*\bformato\b",
        r"^(yes|no|true|false)",  # Pregunta de sí/no
    ]

    # Tarea corta con baja complejidad
    if word_count < 20:
        for signal in low_complexity_signals:
            if re.search(signal, task_lower):
                return "claude-haiku-3-5"

    # Default: Sonnet para la mayoría de tareas
    return "claude-sonnet-4-5"

async def routed_agent(task: str, workspace: str) -> str:
    model = estimate_task_complexity(task)
    print(f"[ROUTER] Tarea asignada a: {model}")

    options = ClaudeCodeOptions(
        model=model,
        cwd=workspace,
        allowed_tools=["View", "GlobTool", "GrepTool", "Edit"],
        max_turns=20,
    )

    result = ""
    async for message in query(prompt=task, options=options):
        if hasattr(message, 'result'):
            result = message.result

    return result

# Ejemplos de routing
# "Clasifica este archivo como config o código" -> Haiku
# "Refactoriza la clase UserService" -> Sonnet
# "Diseña la arquitectura de microservicios para este monolito" -> Opus

TypeScript:

import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

type ModelId = "claude-opus-4-5" | "claude-sonnet-4-5" | "claude-haiku-3-5";

function estimateTaskComplexity(task: string): ModelId {
  const taskLower = task.toLowerCase();
  const wordCount = task.split(/\s+/).length;

  const highComplexitySignals = [
    /\barchitecture\b/,
    /\bsystem design\b/,
    /\bcomplete refactor/,
    /\bmigration\b/,
    /\bmultiple files\b/,
  ];

  for (const signal of highComplexitySignals) {
    if (signal.test(taskLower)) return "claude-opus-4-5";
  }

  const lowComplexitySignals = [
    /\bclassify\b/,
    /\bextract\b.*\b(field|value)\b/,
    /\bformat\b/,
    /\bsummarize\b/,
    /\btranslate\b/,
  ];

  if (wordCount < 20) {
    for (const signal of lowComplexitySignals) {
      if (signal.test(taskLower)) return "claude-haiku-3-5";
    }
  }

  return "claude-sonnet-4-5";
}

async function routedAgent(task: string, workspace: string): Promise<string> {
  const model = estimateTaskComplexity(task);
  console.log(`[ROUTER] Modelo seleccionado: ${model}`);

  const options: ClaudeCodeOptions = {
    model,
    cwd: workspace,
    allowedTools: ["View", "GlobTool", "GrepTool", "Edit"],
    maxTurns: 20,
  };

  let result = "";
  for await (const message of query({ prompt: task, options })) {
    if (message.type === "result") {
      result = message.result;
    }
  }
  return result;
}

Estimación de costo antes de ejecutar

# Precios aproximados por millón de tokens (verificar en anthropic.com)
MODEL_PRICING = {
    "claude-opus-4-5": {"input": 15.0, "output": 75.0},
    "claude-sonnet-4-5": {"input": 3.0, "output": 15.0},
    "claude-haiku-3-5": {"input": 0.25, "output": 1.25},
}

def estimate_cost(prompt: str, model: str, expected_output_tokens: int = 500) -> float:
    """Estimación rápida del costo antes de ejecutar."""
    # Aprox: 1 token = 4 caracteres
    input_tokens = len(prompt) / 4
    pricing = MODEL_PRICING.get(model, MODEL_PRICING["claude-sonnet-4-5"])

    input_cost = (input_tokens / 1_000_000) * pricing["input"]
    output_cost = (expected_output_tokens / 1_000_000) * pricing["output"]

    return input_cost + output_cost

# Ejemplo
cost = estimate_cost(
    prompt="Analiza este archivo de 10KB de código...",
    model="claude-opus-4-5",
    expected_output_tokens=1000
)
print(f"Costo estimado: ${cost:.4f}")

3. Prompt Caching

Cómo funciona el prompt caching de Anthropic

El prompt caching permite guardar en caché partes del contexto que se envían repetidamente. Cuando el mismo contenido se envía de nuevo, Anthropic lo sirve desde caché en lugar de procesarlo de nuevo.

Ahorro típico:

Reglas de invalidación de caché

El caché se invalida cuando:

  1. El contenido cambia (cualquier carácter diferente)
  2. El modelo cambia
  3. La posición del breakpoint cambia
  4. Han pasado más de 5 minutos sin uso (por defecto)

Anti-patrón crítico: system_prompt dinámico

# MAL: El timestamp hace que el caché se invalide en cada request
async def bad_agent(task: str, user_id: str):
    options = ClaudeCodeOptions(
        system_prompt=f"""
        Eres un asistente de código.
        Usuario actual: {user_id}
        Timestamp: {datetime.now()}  # <- Esto invalida el caché cada segundo
        Reglas: [... 5000 tokens de reglas ...]
        """
    )

Correcto: separar la parte estática de la dinámica

# BIEN: La parte estática (larga) va en el system_prompt cacheado
# La parte dinámica va en el prompt del usuario

STATIC_SYSTEM_PROMPT = """
Eres un asistente experto en Python y TypeScript.

REGLAS DE CÓDIGO:
[... 5000 tokens de reglas estáticas que nunca cambian ...]

PATRONES A SEGUIR:
[... más reglas estáticas ...]
"""  # Este prompt largo se cacheará

async def good_agent(task: str, user_id: str):
    options = ClaudeCodeOptions(
        system_prompt=STATIC_SYSTEM_PROMPT,  # Siempre igual = siempre cacheado
        max_turns=20,
    )
    # La info dinámica va en el prompt, no en el system_prompt
    full_task = f"[Usuario: {user_id}]\n\nTarea: {task}"

    async for message in query(prompt=full_task, options=options):
        if hasattr(message, 'result'):
            print(message.result)

Ejemplo completo: agente con system_prompt cacheado

Python:

from claude_code_sdk import query, ClaudeCodeOptions
import os

# El system_prompt se define una sola vez y nunca cambia
CODE_REVIEW_SYSTEM_PROMPT = """
Eres un experto revisor de código con 20 años de experiencia en Python, TypeScript y sistemas distribuidos.

## PRINCIPIOS QUE APLICAS

### Seguridad
- Detectas inyecciones SQL, XSS, CSRF, y otros vectores de ataque OWASP Top 10
- Identificas hardcoded secrets, tokens en código fuente
- Revisas validación de inputs y sanitización de outputs
- Detectas problemas de autorización y autenticación

### Performance
- Identifies N+1 queries en ORMs
- Detectas loops innecesarios y complejidad algorítmica subóptima
- Señalas memoria leaks evidentes
- Revisas uso ineficiente de recursos

### Mantenibilidad
- Aplicas principios SOLID: SRP, OCP, LSP, ISP, DIP
- Detectas código duplicado (DRY violations)
- Evalúas complejidad ciclomática
- Revisas naming conventions y documentación

### Patrones de arquitectura
- Detectas violaciones de arquitectura hexagonal
- Identificas acoplamiento excesivo entre módulos
- Señalas dependencias circulares
- Revisas separación de responsabilidades

## FORMATO DE RESPUESTA

Para cada problema encontrado:
1. **Severidad**: CRITICAL / HIGH / MEDIUM / LOW / INFO
2. **Categoría**: Security / Performance / Maintainability / Architecture
3. **Descripción**: Qué está mal y por qué es un problema
4. **Línea**: Dónde está el problema (si aplica)
5. **Sugerencia**: Cómo corregirlo

Al final, un resumen con:
- Total de issues por severidad
- Puntuación general (0-100)
- Top 3 prioridades

## ESTILO
- Sé directo y específico, sin rodeos
- Incluye ejemplos de código corregido cuando sea útil
- Si el código está bien, dilo claramente
"""

async def cached_code_review(file_path: str, specific_concern: str = "") -> str:
    """
    Revisa código usando prompt caching.
    El system_prompt largo se cacheará después de la primera llamada.
    """
    options = ClaudeCodeOptions(
        system_prompt=CODE_REVIEW_SYSTEM_PROMPT,  # Siempre idéntico = cacheado
        allowed_tools=["View", "GlobTool", "GrepTool"],
        max_turns=5,
        model="claude-sonnet-4-5",
    )

    prompt = f"Revisa el código en {file_path}."
    if specific_concern:
        prompt += f"\n\nPresta especial atención a: {specific_concern}"

    result = ""
    async for message in query(prompt=prompt, options=options):
        if hasattr(message, 'result'):
            result = message.result

    return result

# Uso: la primera llamada paga el costo completo de tokenización del system_prompt
# Las llamadas siguientes usan el caché (mucho más barato)
async def review_multiple_files(file_paths: list[str]) -> dict[str, str]:
    results = {}
    for path in file_paths:
        # Cada llamada usa el mismo system_prompt cacheado
        results[path] = await cached_code_review(path)
        print(f"Revisado: {path}")
    return results

TypeScript:

import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

const CODE_REVIEW_SYSTEM_PROMPT = `
Eres un experto revisor de código con 20 años de experiencia.

## PRINCIPIOS QUE APLICAS

### Seguridad
- Detectas inyecciones SQL, XSS, CSRF y otros vectores OWASP Top 10
- Identificas hardcoded secrets, tokens en código fuente
- Revisas validación de inputs

### Performance
- Identificas N+1 queries en ORMs
- Detectas loops innecesarios y complejidad algorítmica subóptima
- Señalas memoria leaks evidentes

### Mantenibilidad
- Aplicas principios SOLID
- Detectas código duplicado (DRY violations)
- Evalúas complejidad ciclomática

## FORMATO DE RESPUESTA
Para cada problema: Severidad | Categoría | Descripción | Línea | Sugerencia
Al final: resumen con puntuación 0-100 y top 3 prioridades
`.trim();

async function cachedCodeReview(filePath: string, specificConcern?: string): Promise<string> {
  const options: ClaudeCodeOptions = {
    systemPrompt: CODE_REVIEW_SYSTEM_PROMPT, // Siempre idéntico = cacheado
    allowedTools: ["View", "GlobTool", "GrepTool"],
    maxTurns: 5,
    model: "claude-sonnet-4-5",
  };

  let prompt = `Revisa el código en ${filePath}.`;
  if (specificConcern) {
    prompt += `\n\nPresta especial atención a: ${specificConcern}`;
  }

  let result = "";
  for await (const message of query({ prompt, options })) {
    if (message.type === "result") {
      result = message.result;
    }
  }
  return result;
}

async function reviewMultipleFiles(filePaths: string[]): Promise<Record<string, string>> {
  const results: Record<string, string> = {};
  for (const path of filePaths) {
    results[path] = await cachedCodeReview(path);
    console.log(`Revisado: ${path}`);
  }
  return results;
}

Midiendo el ahorro del caché

async def measure_cache_savings(task: str, options: ClaudeCodeOptions, repetitions: int = 5):
    """Mide el ahorro del caché en múltiples repeticiones."""
    costs = []

    for i in range(repetitions):
        total_cost = 0.0
        async for message in query(prompt=task, options=options):
            if hasattr(message, 'cost_usd') and message.cost_usd:
                total_cost += message.cost_usd

        costs.append(total_cost)
        print(f"Repetición {i+1}: ${total_cost:.6f}")

    print(f"\nPrimera llamada (sin caché): ${costs[0]:.6f}")
    print(f"Promedio con caché: ${sum(costs[1:]) / len(costs[1:]):.6f}")
    print(f"Ahorro: {(1 - sum(costs[1:]) / len(costs[1:]) / costs[0]) * 100:.1f}%")

4. Streaming para Latencia Percibida

Por qué streaming mejora la UX

Sin streaming, el usuario ve una pantalla en blanco hasta que el agente termina. Con streaming, ve el output aparecer progresivamente. Para tareas de 30 segundos, esto hace una diferencia enorme en la percepción.

Python - streaming a terminal:

from claude_code_sdk import query, ClaudeCodeOptions
import sys

async def streaming_agent(task: str) -> str:
    options = ClaudeCodeOptions(
        allowed_tools=["View", "GlobTool"],
        max_turns=10,
    )

    full_result = ""
    print("Agente: ", end="", flush=True)

    async for message in query(prompt=task, options=options):
        # Mostrar progreso mientras el agente trabaja
        if hasattr(message, 'type'):
            if message.type == "assistant":
                # Mostrar el texto a medida que llega
                if hasattr(message, 'message') and hasattr(message.message, 'content'):
                    for block in message.message.content:
                        if hasattr(block, 'text'):
                            print(block.text, end="", flush=True)

            elif message.type == "tool_use":
                # Notificar al usuario qué herramienta se está usando
                tool_name = getattr(message, 'tool_name', 'unknown')
                print(f"\n[Usando: {tool_name}] ", end="", flush=True)

            elif message.type == "result":
                full_result = message.result
                print()  # Nueva línea al final

    return full_result

TypeScript - streaming a SSE:

import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
import { Response } from "express";

async function streamAgentToSSE(task: string, res: Response): Promise<void> {
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");

  const options: ClaudeCodeOptions = {
    allowedTools: ["View", "GlobTool"],
    maxTurns: 10,
  };

  for await (const message of query({ prompt: task, options })) {
    if (message.type === "assistant") {
      const content = (message as any).message?.content ?? [];
      for (const block of content) {
        if (block.type === "text") {
          res.write(`data: ${JSON.stringify({ type: "text", text: block.text })}\n\n`);
        }
      }
    } else if (message.type === "result") {
      res.write(`data: ${JSON.stringify({ type: "done", result: message.result })}\n\n`);
    }
  }

  res.end();
}

Cuándo el streaming NO ayuda

El streaming no mejora el TTC (tiempo total). Solo mejora la latencia percibida. No usar streaming para:


5. Paralelismo y Concurrencia

El problema con queries secuenciales

sequenceDiagram
    participant C as Cliente
    participant A1 as Agente 1
    participant A2 as Agente 2
    participant A3 as Agente 3

    Note over C,A3: Secuencial: 30 segundos total
    C->>A1: Query 1
    A1-->>C: Respuesta 1 (10s)
    C->>A2: Query 2
    A2-->>C: Respuesta 2 (10s)
    C->>A3: Query 3
    A3-->>C: Respuesta 3 (10s)

    Note over C,A3: Paralelo: ~10 segundos total
    C->>A1: Query 1
    C->>A2: Query 2
    C->>A3: Query 3
    A1-->>C: Respuesta 1
    A2-->>C: Respuesta 2
    A3-->>C: Respuesta 3

asyncio.gather para múltiples queries

Python:

from claude_code_sdk import query, ClaudeCodeOptions
import asyncio
from typing import Any

async def single_agent_task(task: str, task_id: str) -> tuple[str, str]:
    """Ejecuta una tarea individual."""
    options = ClaudeCodeOptions(
        allowed_tools=["View", "GlobTool", "GrepTool"],
        max_turns=10,
        model="claude-haiku-3-5",  # Haiku para tareas simples paralelas
    )

    result = ""
    async for message in query(prompt=task, options=options):
        if hasattr(message, 'result'):
            result = message.result

    return task_id, result

async def parallel_agent_queries(tasks: list[tuple[str, str]]) -> dict[str, str]:
    """
    Ejecuta múltiples queries en paralelo.
    tasks: lista de (task_id, task_prompt)
    """
    coroutines = [single_agent_task(task, task_id) for task_id, task in tasks]
    results = await asyncio.gather(*coroutines, return_exceptions=True)

    output = {}
    for result in results:
        if isinstance(result, Exception):
            print(f"Error en una tarea: {result}")
        else:
            task_id, task_result = result
            output[task_id] = task_result

    return output

# Ejemplo: analizar múltiples archivos en paralelo
async def analyze_files_parallel(file_paths: list[str]) -> dict[str, str]:
    tasks = [
        (path, f"Analiza {path} y lista los problemas de seguridad")
        for path in file_paths
    ]
    return await parallel_agent_queries(tasks)

Semaphore para limitar concurrencia (rate limit)

La API de Anthropic tiene límites de requests por minuto. Un semaphore evita sobrepasarlos:

Python:

import asyncio
from claude_code_sdk import query, ClaudeCodeOptions

async def process_with_concurrency_limit(
    tasks: list[str],
    max_concurrent: int = 5,
    workspace: str = "/workspace"
) -> list[str]:
    """
    Procesa tareas en paralelo con límite de concurrencia.
    max_concurrent: máximo de queries simultáneos a la API.
    """
    semaphore = asyncio.Semaphore(max_concurrent)

    async def bounded_task(task: str) -> str:
        async with semaphore:  # Solo `max_concurrent` tareas corren a la vez
            options = ClaudeCodeOptions(
                cwd=workspace,
                allowed_tools=["View", "GlobTool"],
                max_turns=5,
                model="claude-haiku-3-5",
            )

            result = ""
            async for message in query(prompt=task, options=options):
                if hasattr(message, 'result'):
                    result = message.result

            return result

    return await asyncio.gather(*[bounded_task(task) for task in tasks])

# Uso: procesar 100 archivos con máximo 5 en paralelo
async def main():
    files = [f"archivo_{i}.py" for i in range(100)]
    tasks = [f"Revisa {f} por vulnerabilidades de seguridad" for f in files]

    results = await process_with_concurrency_limit(tasks, max_concurrent=5)
    print(f"Procesados {len(results)} archivos")

Ejemplo: procesar 100 archivos en paralelo

Python completo:

from claude_code_sdk import query, ClaudeCodeOptions
import asyncio
import time
from pathlib import Path

async def analyze_single_file(
    file_path: str,
    semaphore: asyncio.Semaphore,
    results: dict,
    errors: dict
) -> None:
    """Analiza un archivo individual con control de concurrencia."""
    async with semaphore:
        try:
            options = ClaudeCodeOptions(
                allowed_tools=["View"],
                max_turns=3,
                model="claude-haiku-3-5",  # Rápido y barato para análisis simple
            )

            file_result = ""
            async for message in query(
                prompt=f"Lista los imports y exportaciones principales de {file_path}",
                options=options
            ):
                if hasattr(message, 'result'):
                    file_result = message.result

            results[file_path] = file_result

        except Exception as e:
            errors[file_path] = str(e)

async def analyze_codebase(
    directory: str,
    pattern: str = "**/*.py",
    max_concurrent: int = 10
) -> dict[str, str]:
    """Analiza todos los archivos que coincidan con el patrón en paralelo."""
    files = list(Path(directory).glob(pattern))
    print(f"Analizando {len(files)} archivos con {max_concurrent} workers...")

    semaphore = asyncio.Semaphore(max_concurrent)
    results: dict[str, str] = {}
    errors: dict[str, str] = {}

    start_time = time.time()

    tasks = [
        analyze_single_file(str(f), semaphore, results, errors)
        for f in files
    ]

    await asyncio.gather(*tasks)

    elapsed = time.time() - start_time
    print(f"Completado en {elapsed:.1f}s")
    print(f"Exitosos: {len(results)}, Errores: {len(errors)}")

    return results

TypeScript: Promise.all con límite de concurrencia

TypeScript:

import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

async function processBatch<T>(
  items: T[],
  processor: (item: T) => Promise<string>,
  maxConcurrent: number
): Promise<Array<{ item: T; result?: string; error?: string }>> {
  const results: Array<{ item: T; result?: string; error?: string }> = [];

  // Dividir en chunks del tamaño de maxConcurrent
  for (let i = 0; i < items.length; i += maxConcurrent) {
    const chunk = items.slice(i, i + maxConcurrent);

    const chunkResults = await Promise.allSettled(
      chunk.map((item) => processor(item))
    );

    for (let j = 0; j < chunk.length; j++) {
      const outcome = chunkResults[j];
      if (outcome.status === "fulfilled") {
        results.push({ item: chunk[j], result: outcome.value });
      } else {
        results.push({ item: chunk[j], error: outcome.reason?.message });
      }
    }

    console.log(`Procesados ${Math.min(i + maxConcurrent, items.length)}/${items.length}`);
  }

  return results;
}

async function analyzeFiles(filePaths: string[]): Promise<void> {
  const analyzeFile = async (filePath: string): Promise<string> => {
    const options: ClaudeCodeOptions = {
      allowedTools: ["View"],
      maxTurns: 3,
      model: "claude-haiku-3-5",
    };

    let result = "";
    for await (const message of query({
      prompt: `Analiza brevemente ${filePath}`,
      options,
    })) {
      if (message.type === "result") result = message.result;
    }
    return result;
  };

  const results = await processBatch(filePaths, analyzeFile, 5);
  console.log(`Analizados: ${results.filter((r) => r.result).length} archivos`);
}

Patrón producer/consumer

Para pipelines complejos donde el análisis alimenta acciones:

Python:

import asyncio
from claude_code_sdk import query, ClaudeCodeOptions

async def producer(file_queue: asyncio.Queue, files: list[str]):
    """Produce archivos para analizar."""
    for file in files:
        await file_queue.put(file)
    # Señal de fin
    for _ in range(5):  # Un "poison pill" por worker
        await file_queue.put(None)

async def consumer(
    worker_id: int,
    file_queue: asyncio.Queue,
    result_queue: asyncio.Queue
):
    """Consume archivos, los analiza y pone resultados en result_queue."""
    while True:
        file = await file_queue.get()
        if file is None:
            break  # Poison pill: terminar

        options = ClaudeCodeOptions(
            allowed_tools=["View"],
            max_turns=3,
            model="claude-haiku-3-5",
        )

        result = ""
        async for message in query(
            prompt=f"¿Tiene este archivo tests? Responde solo: SÍ o NO. Archivo: {file}",
            options=options
        ):
            if hasattr(message, 'result'):
                result = message.result

        await result_queue.put({"file": file, "result": result, "worker": worker_id})
        file_queue.task_done()

async def producer_consumer_pipeline(files: list[str], num_workers: int = 5):
    file_queue: asyncio.Queue = asyncio.Queue(maxsize=20)
    result_queue: asyncio.Queue = asyncio.Queue()

    # Iniciar producer y consumers en paralelo
    producers = [asyncio.create_task(producer(file_queue, files))]
    consumers = [
        asyncio.create_task(consumer(i, file_queue, result_queue))
        for i in range(num_workers)
    ]

    await asyncio.gather(*producers)
    await asyncio.gather(*consumers)

    # Recolectar todos los resultados
    results = []
    while not result_queue.empty():
        results.append(await result_queue.get())

    return results

6. Optimización de Herramientas

Jerarquía de herramientas por costo de tokens

graph TD
    A[Glob - buscar archivos] -->|"Muy barato: lista de paths"| B
    B[Grep - buscar contenido] -->|"Barato: solo líneas que coinciden"| C
    C[View con range - leer sección] -->|"Moderado: solo parte del archivo"| D
    D[View completo - leer archivo] -->|"Costoso: todo el contenido"| E
    E[Múltiples Reads] -->|"Muy costoso: n × contenido"| F
    F[Bash - ejecutar y leer output] -->|"El más costoso y poderoso"| G[Resultado]

Grep antes de Read

Anti-patrón:

# MAL: Leer 50 archivos para encontrar la función
options = ClaudeCodeOptions(
    system_prompt="Encuentra la función 'process_payment' y explica cómo funciona.",
    allowed_tools=["View", "GlobTool"],
)
# El agente probablemente leerá muchos archivos antes de encontrar la función

Optimizado:

# BIEN: Guiar al agente para usar Grep primero
options = ClaudeCodeOptions(
    system_prompt="""Cuando necesites encontrar código:
    1. PRIMERO usa GrepTool para encontrar el archivo que contiene el código
    2. LUEGO usa View con un rango de líneas para leer solo la parte relevante
    3. NUNCA leas archivos completos si puedes usar Grep primero
    """,
    allowed_tools=["View", "GlobTool", "GrepTool"],
)

Profiling de uso de herramientas

from collections import defaultdict
from claude_code_sdk import query, ClaudeCodeOptions

class ToolProfiler:
    def __init__(self):
        self.tool_calls = defaultdict(int)
        self.total_queries = 0

    def record(self, messages: list):
        self.total_queries += 1
        for message in messages:
            if hasattr(message, 'type') and message.type == "tool_use":
                tool_name = getattr(message, 'tool_name', 'unknown')
                self.tool_calls[tool_name] += 1

    def report(self):
        print(f"\n=== Tool Profiling Report ({self.total_queries} queries) ===")
        for tool, count in sorted(self.tool_calls.items(), key=lambda x: -x[1]):
            avg = count / self.total_queries
            print(f"  {tool}: {count} calls ({avg:.1f} avg/query)")

profiler = ToolProfiler()

async def profiled_agent(task: str) -> str:
    options = ClaudeCodeOptions(
        allowed_tools=["View", "GlobTool", "GrepTool", "Edit"],
        max_turns=20,
    )

    all_messages = []
    result = ""

    async for message in query(prompt=task, options=options):
        all_messages.append(message)
        if hasattr(message, 'result'):
            result = message.result

    profiler.record(all_messages)
    return result

Combinar consultas en una sola tool call

Ineficiente:

Turn 1: ¿Qué archivos hay en /src?
Turn 2: Lee /src/main.py
Turn 3: Lee /src/utils.py
Turn 4: Busca "import" en todos los archivos

Eficiente (guiar con el system_prompt):

options = ClaudeCodeOptions(
    system_prompt="""Optimiza tu uso de herramientas:
    - Usa GlobTool para listar todos los archivos que necesitas DE UNA VEZ
    - Usa GrepTool para encontrar patrones en MÚLTIPLES archivos simultáneamente
    - Lee solo las secciones relevantes, no archivos completos
    - Planifica antes de ejecutar: determina qué archivos necesitas antes de leerlos
    """,
    allowed_tools=["View", "GlobTool", "GrepTool"],
    max_turns=10,  # Menos turns necesarios con uso eficiente de herramientas
)

7. Caching de Resultados

Cache layer sobre query()

Para queries idénticas (mismo task + mismo contexto), cachear el resultado evita llamadas repetidas a la API:

Python:

import hashlib
import json
import time
from claude_code_sdk import query, ClaudeCodeOptions

class AgentResultCache:
    """Cache en memoria para resultados de queries."""

    def __init__(self, ttl_seconds: int = 3600):
        self._cache: dict[str, tuple[str, float]] = {}
        self.ttl_seconds = ttl_seconds
        self.hits = 0
        self.misses = 0

    def _make_key(self, prompt: str, options: ClaudeCodeOptions) -> str:
        """Genera una clave única basada en el prompt y opciones relevantes."""
        key_data = {
            "prompt": prompt,
            "model": getattr(options, 'model', 'default'),
            "allowed_tools": sorted(getattr(options, 'allowed_tools', [])),
            "system_prompt": getattr(options, 'system_prompt', ''),
        }
        key_str = json.dumps(key_data, sort_keys=True)
        return hashlib.sha256(key_str.encode()).hexdigest()

    def get(self, key: str) -> str | None:
        if key in self._cache:
            result, timestamp = self._cache[key]
            if time.time() - timestamp < self.ttl_seconds:
                self.hits += 1
                return result
            else:
                del self._cache[key]  # Expirado
        self.misses += 1
        return None

    def set(self, key: str, result: str):
        self._cache[key] = (result, time.time())

    def stats(self) -> str:
        total = self.hits + self.misses
        hit_rate = (self.hits / total * 100) if total > 0 else 0
        return f"Cache: {self.hits} hits, {self.misses} misses ({hit_rate:.1f}% hit rate)"

# Instancia global del caché
_cache = AgentResultCache(ttl_seconds=3600)

async def cached_query(prompt: str, options: ClaudeCodeOptions) -> str:
    """Wrapper de query() con caché de resultados."""
    cache_key = _cache._make_key(prompt, options)

    # Verificar caché primero
    cached_result = _cache.get(cache_key)
    if cached_result is not None:
        print("[CACHE HIT]")
        return cached_result

    # Ejecutar query si no hay caché
    print("[CACHE MISS] Ejecutando query...")
    result = ""
    async for message in query(prompt=prompt, options=options):
        if hasattr(message, 'result'):
            result = message.result

    # Guardar en caché
    _cache.set(cache_key, result)
    return result

Redis para cache distribuida

import redis.asyncio as aioredis
import hashlib
import json
from claude_code_sdk import query, ClaudeCodeOptions

class RedisAgentCache:
    def __init__(self, redis_url: str = "redis://localhost:6379", ttl: int = 3600):
        self.redis = aioredis.from_url(redis_url)
        self.ttl = ttl

    def _make_key(self, prompt: str, model: str) -> str:
        data = json.dumps({"prompt": prompt, "model": model}, sort_keys=True)
        return f"agent:cache:{hashlib.sha256(data.encode()).hexdigest()}"

    async def get(self, prompt: str, model: str) -> str | None:
        key = self._make_key(prompt, model)
        value = await self.redis.get(key)
        return value.decode() if value else None

    async def set(self, prompt: str, model: str, result: str):
        key = self._make_key(prompt, model)
        await self.redis.setex(key, self.ttl, result.encode())

    async def invalidate_by_file(self, file_path: str):
        """Invalida todas las entradas de caché relacionadas con un archivo."""
        # Implementación: buscar keys con pattern relacionado al archivo
        pattern = f"agent:cache:*"
        async for key in self.redis.scan_iter(pattern):
            # En una implementación real, guardarías metadata del archivo
            pass

redis_cache = RedisAgentCache()

async def redis_cached_query(prompt: str, options: ClaudeCodeOptions) -> str:
    model = getattr(options, 'model', 'claude-sonnet-4-5')

    cached = await redis_cache.get(prompt, model)
    if cached:
        return cached

    result = ""
    async for message in query(prompt=prompt, options=options):
        if hasattr(message, 'result'):
            result = message.result

    await redis_cache.set(prompt, model, result)
    return result

TypeScript:

import { createClient } from "redis";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
import { createHash } from "crypto";

const redisClient = createClient({ url: "redis://localhost:6379" });

function makeCacheKey(prompt: string, model: string): string {
  const data = JSON.stringify({ prompt, model });
  return `agent:cache:${createHash("sha256").update(data).digest("hex")}`;
}

async function redisCachedQuery(prompt: string, options: ClaudeCodeOptions): Promise<string> {
  const model = options.model ?? "claude-sonnet-4-5";
  const key = makeCacheKey(prompt, model);

  const cached = await redisClient.get(key);
  if (cached) {
    console.log("[CACHE HIT]");
    return cached;
  }

  let result = "";
  for await (const message of query({ prompt, options })) {
    if (message.type === "result") {
      result = message.result;
    }
  }

  await redisClient.setEx(key, 3600, result);
  return result;
}

8. Batch Processing

Agrupar múltiples tareas pequeñas en una query

En lugar de hacer N queries para N archivos, a veces es más eficiente hacer 1 query con N archivos:

Python:

from claude_code_sdk import query, ClaudeCodeOptions
import json

async def batch_classify_files(file_paths: list[str], workspace: str) -> dict[str, str]:
    """
    Clasifica múltiples archivos en una sola query (batch).
    Más eficiente que N queries individuales cuando los archivos son pequeños.
    """
    # Leer los archivos localmente para incluirlos en el prompt
    files_content = {}
    for path in file_paths[:10]:  # Limitar para no exceder context window
        try:
            with open(f"{workspace}/{path}") as f:
                content = f.read()[:500]  # Solo primeros 500 chars
            files_content[path] = content
        except Exception:
            pass

    files_json = json.dumps(files_content, indent=2)

    options = ClaudeCodeOptions(
        allowed_tools=[],  # Sin herramientas - tenemos el contenido en el prompt
        max_turns=3,
        model="claude-haiku-3-5",
    )

    prompt = f"""Para cada archivo a continuación, clasifícalo como uno de:
    - "config" (archivos de configuración)
    - "test" (archivos de prueba)
    - "business_logic" (lógica de negocio)
    - "utility" (utilidades/helpers)
    - "ui" (componentes de interfaz)

    Responde en JSON: {{"archivo.py": "tipo", ...}}

    ARCHIVOS:
    {files_json}
    """

    result = ""
    async for message in query(prompt=prompt, options=options):
        if hasattr(message, 'result'):
            result = message.result

    try:
        return json.loads(result)
    except json.JSONDecodeError:
        return {}

# Comparación de enfoque
# Batch (1 query):        $0.002, 3 segundos para 10 archivos
# Individual (10 queries): $0.010, 30 segundos para 10 archivos

Cuándo batching es más económico vs. paralelo

graph TD
    A[¿Cuántos archivos?] -->|"< 10 archivos pequeños"| B[Batch: 1 query]
    A -->|"10-100 archivos"| C[Paralelo con semaphore]
    A -->|"> 100 archivos"| D[Producer/Consumer Pipeline]

    B -->|"Pro: 1 sola query\nCon: context window limitado"| E[Resultado]
    C -->|"Pro: todos en paralelo\nCon: N queries"| E
    D -->|"Pro: escalable\nCon: más complejidad"| E

Ejemplo completo: procesar 1000 archivos eficientemente

import asyncio
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions

async def process_1000_files_efficiently(directory: str) -> dict[str, str]:
    """
    Estrategia óptima para procesar 1000 archivos:
    1. Agrupamos en batches de 10 (1 query por batch)
    2. Ejecutamos 10 batches en paralelo
    3. Total: 100 queries en paralelo, cada una con 10 archivos
    """
    all_files = list(Path(directory).glob("**/*.py"))
    print(f"Total archivos: {len(all_files)}")

    BATCH_SIZE = 10
    MAX_CONCURRENT_BATCHES = 10

    # Dividir en batches
    batches = [
        all_files[i:i + BATCH_SIZE]
        for i in range(0, len(all_files), BATCH_SIZE)
    ]

    semaphore = asyncio.Semaphore(MAX_CONCURRENT_BATCHES)

    async def process_batch(batch: list[Path]) -> dict[str, str]:
        async with semaphore:
            batch_content = {}
            for f in batch:
                try:
                    content = f.read_text()[:300]
                    batch_content[str(f.name)] = content
                except Exception:
                    pass

            options = ClaudeCodeOptions(
                allowed_tools=[],
                max_turns=2,
                model="claude-haiku-3-5",
            )

            prompt = f"""Clasifica brevemente cada archivo (una línea por archivo):
{', '.join(batch_content.keys())}

Responde: nombre_archivo: descripción_una_linea"""

            result = ""
            async for message in query(prompt=prompt, options=options):
                if hasattr(message, 'result'):
                    result = message.result

            # Parsear resultado simple
            classifications = {}
            for line in result.strip().split('\n'):
                if ':' in line:
                    parts = line.split(':', 1)
                    if len(parts) == 2:
                        classifications[parts[0].strip()] = parts[1].strip()

            return classifications

    results: dict[str, str] = {}
    batch_results = await asyncio.gather(*[process_batch(b) for b in batches])

    for batch_result in batch_results:
        results.update(batch_result)

    print(f"Procesados {len(results)} archivos")
    return results

9. Optimización de Tokens

Reducir el system_prompt a lo esencial

Cada token en el system_prompt se paga en cada query. Un system_prompt de 2000 tokens vs. uno de 200 tokens puede significar 10x más gasto:

# Versión no optimizada: 2000 tokens
BAD_SYSTEM_PROMPT = """
Eres un asistente de programación muy experimentado que ha trabajado durante muchos años
en la industria del software. Tienes experiencia en Python, JavaScript, TypeScript, Go,
Rust, Java, C++, y muchos otros lenguajes. Siempre escribes código limpio y bien documentado.
Sigues las mejores prácticas de la industria. Aplicas los principios SOLID. Escribes tests.
[... 1800 tokens más de instrucciones redundantes ...]
"""

# Versión optimizada: 200 tokens
GOOD_SYSTEM_PROMPT = """
Eres un revisor de código experto. Prioridades:
1. Seguridad (vulnerabilidades OWASP Top 10)
2. Principios SOLID
3. Complejidad O(n)
Responde: problema | severidad | línea | fix sugerido
"""

Comprimir outputs largos de herramientas

Si una herramienta retorna 10,000 tokens pero solo necesitas 500, estás pagando por tokens innecesarios en el contexto:

options = ClaudeCodeOptions(
    system_prompt="""Cuando uses herramientas:
    - Al leer archivos, especifica el rango de líneas relevantes (no leas archivos completos)
    - Al usar Grep, usa patrones específicos para obtener pocas coincidencias
    - Si un archivo es largo, lee solo las funciones relevantes
    - Nunca leas más de 200 líneas consecutivas a la vez
    """,
)

Resumen periódico en sesiones largas

En sessions con muchos turns, el contexto crece y se vuelve costoso. Estrategia: resumir el progreso periódicamente.

Python:

from claude_code_sdk import query, ClaudeCodeOptions

async def long_session_with_summarization(initial_task: str, workspace: str) -> str:
    """
    Para tareas largas, divide en subtareas y resume el progreso.
    Evita contextos enormes que consumen muchos tokens.
    """

    # Fase 1: Planificar
    plan_options = ClaudeCodeOptions(
        cwd=workspace,
        allowed_tools=["View", "GlobTool"],
        max_turns=5,
        model="claude-haiku-3-5",
    )

    plan = ""
    async for message in query(
        prompt=f"Crea un plan de 5 pasos para: {initial_task}. Solo el plan, sin ejecutar.",
        options=plan_options
    ):
        if hasattr(message, 'result'):
            plan = message.result

    # Fase 2: Ejecutar paso a paso (cada fase con contexto fresco)
    progress = []
    steps = [line for line in plan.split('\n') if line.strip()][:5]

    for step in steps:
        step_context = "\n".join([
            f"Tarea original: {initial_task}",
            f"Plan:\n{plan}",
            f"Progreso hasta ahora:\n{chr(10).join(progress[-2:])}",  # Solo últimos 2 pasos
            f"Paso actual a ejecutar: {step}",
        ])

        step_options = ClaudeCodeOptions(
            cwd=workspace,
            allowed_tools=["View", "GlobTool", "GrepTool", "Edit"],
            max_turns=10,
        )

        step_result = ""
        async for message in query(prompt=step_context, options=step_options):
            if hasattr(message, 'result'):
                step_result = message.result

        # Resumir el resultado del paso (no acumular el texto completo)
        progress.append(f"✓ {step[:50]}: {step_result[:100]}")

    return "\n".join(progress)

Estimación de tokens antes de ejecutar

def estimate_tokens(text: str) -> int:
    """Estimación rápida: ~4 caracteres por token en inglés, ~2-3 en español."""
    return len(text) // 3  # Conservador para español

def will_exceed_context(prompt: str, system_prompt: str, max_context: int = 100_000) -> bool:
    estimated = estimate_tokens(prompt) + estimate_tokens(system_prompt)
    return estimated > max_context * 0.8  # 80% del límite como buffer

# Uso
if will_exceed_context(my_prompt, my_system_prompt):
    # Dividir la tarea en partes más pequeñas
    print("La tarea es demasiado grande, dividiendo...")

10. Monitoreo de Costos en Producción

Dashboard de costos por agente y usuario

Python con Prometheus:

from prometheus_client import Counter, Histogram, start_http_server
from claude_code_sdk import query, ClaudeCodeOptions

# Métricas de Prometheus
agent_queries_total = Counter(
    'agent_queries_total',
    'Total de queries al agente',
    ['agent_name', 'user_id', 'model', 'status']
)

agent_cost_total = Counter(
    'agent_cost_usd_total',
    'Costo total en USD',
    ['agent_name', 'user_id', 'model']
)

agent_duration_seconds = Histogram(
    'agent_duration_seconds',
    'Duración de queries en segundos',
    ['agent_name', 'model'],
    buckets=[1, 5, 10, 30, 60, 120, 300]
)

class MonitoredAgent:
    def __init__(self, agent_name: str):
        self.agent_name = agent_name

    async def run(self, user_id: str, task: str, options: ClaudeCodeOptions) -> str:
        import time
        model = getattr(options, 'model', 'unknown')
        start_time = time.time()
        status = "success"

        try:
            result = ""
            total_cost = 0.0

            async for message in query(prompt=task, options=options):
                if hasattr(message, 'cost_usd') and message.cost_usd:
                    total_cost += message.cost_usd
                if hasattr(message, 'result'):
                    result = message.result

            # Registrar costo
            agent_cost_total.labels(
                agent_name=self.agent_name,
                user_id=user_id,
                model=model
            ).inc(total_cost)

            return result

        except Exception as e:
            status = "error"
            raise
        finally:
            duration = time.time() - start_time
            agent_queries_total.labels(
                agent_name=self.agent_name,
                user_id=user_id,
                model=model,
                status=status
            ).inc()
            agent_duration_seconds.labels(
                agent_name=self.agent_name,
                model=model
            ).observe(duration)

# Iniciar servidor de métricas en puerto 9090
start_http_server(9090)

# Uso
code_reviewer = MonitoredAgent("code_reviewer")
result = await code_reviewer.run(
    user_id="user123",
    task="Revisa src/main.py",
    options=ClaudeCodeOptions(allowed_tools=["View"], max_turns=5)
)

Alertas cuando el costo supera umbral

Python con alertas por email:

import smtplib
from email.mime.text import MIMEText
import os

DAILY_COST_ALERT_THRESHOLD_USD = 50.0

async def check_and_alert_costs(user_id: str, current_daily_cost: float):
    """Envía alerta si el costo diario supera el umbral."""
    if current_daily_cost > DAILY_COST_ALERT_THRESHOLD_USD:
        subject = f"[ALERTA] Costo de agente alto: ${current_daily_cost:.2f}"
        body = f"""
El usuario {user_id} ha superado el umbral de costo diario.

Costo actual del día: ${current_daily_cost:.2f}
Umbral configurado: ${DAILY_COST_ALERT_THRESHOLD_USD:.2f}

Acciones posibles:
- Revisar si hay queries ineficientes
- Verificar si hay uso no autorizado
- Considerar aumentar el umbral si el uso es legítimo
        """

        msg = MIMEText(body)
        msg['Subject'] = subject
        msg['From'] = os.environ["ALERT_EMAIL_FROM"]
        msg['To'] = os.environ["ALERT_EMAIL_TO"]

        with smtplib.SMTP(os.environ["SMTP_HOST"]) as server:
            server.send_message(msg)

        print(f"[ALERT] Email enviado: costo ${current_daily_cost:.2f} para {user_id}")

Cost tracker completo

Python:

import json
from datetime import datetime, timezone
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions

class CostTracker:
    def __init__(self, log_path: str):
        self.log_path = Path(log_path)
        self.log_path.parent.mkdir(parents=True, exist_ok=True)

    def record(self, user_id: str, agent_name: str, model: str, cost_usd: float, turns: int):
        entry = {
            "timestamp": datetime.now(tz=timezone.utc).isoformat(),
            "user_id": user_id,
            "agent_name": agent_name,
            "model": model,
            "cost_usd": cost_usd,
            "turns": turns,
            "date": datetime.now().strftime("%Y-%m-%d"),
        }
        with open(self.log_path, "a") as f:
            f.write(json.dumps(entry) + "\n")

    def daily_report(self, date: str | None = None) -> dict:
        target_date = date or datetime.now().strftime("%Y-%m-%d")
        entries = []

        if self.log_path.exists():
            with open(self.log_path) as f:
                for line in f:
                    entry = json.loads(line)
                    if entry["date"] == target_date:
                        entries.append(entry)

        # Agrupar por agente
        by_agent: dict[str, dict] = {}
        for entry in entries:
            agent = entry["agent_name"]
            if agent not in by_agent:
                by_agent[agent] = {"total_cost": 0.0, "queries": 0}
            by_agent[agent]["total_cost"] += entry["cost_usd"]
            by_agent[agent]["queries"] += 1

        total = sum(e["cost_usd"] for e in entries)

        return {
            "date": target_date,
            "total_cost_usd": total,
            "total_queries": len(entries),
            "by_agent": by_agent,
        }

    def monthly_report(self, year: int, month: int) -> dict:
        prefix = f"{year}-{month:02d}"
        entries = []

        if self.log_path.exists():
            with open(self.log_path) as f:
                for line in f:
                    entry = json.loads(line)
                    if entry["date"].startswith(prefix):
                        entries.append(entry)

        total = sum(e["cost_usd"] for e in entries)
        by_model: dict[str, float] = {}
        for entry in entries:
            model = entry["model"]
            by_model[model] = by_model.get(model, 0.0) + entry["cost_usd"]

        return {
            "period": f"{year}-{month:02d}",
            "total_cost_usd": total,
            "total_queries": len(entries),
            "by_model": by_model,
        }

# Uso
tracker = CostTracker("/var/log/agent/costs.jsonl")

async def tracked_agent(user_id: str, agent_name: str, task: str) -> str:
    options = ClaudeCodeOptions(
        allowed_tools=["View", "GlobTool"],
        max_turns=10,
        model="claude-haiku-3-5",
    )

    total_cost = 0.0
    total_turns = 0
    result = ""

    async for message in query(prompt=task, options=options):
        if hasattr(message, 'cost_usd') and message.cost_usd:
            total_cost += message.cost_usd
        if hasattr(message, 'num_turns'):
            total_turns = message.num_turns
        if hasattr(message, 'result'):
            result = message.result

    tracker.record(
        user_id=user_id,
        agent_name=agent_name,
        model="claude-haiku-3-5",
        cost_usd=total_cost,
        turns=total_turns,
    )

    # Verificar alerta de costo
    daily = tracker.daily_report()
    if daily["total_cost_usd"] > DAILY_COST_ALERT_THRESHOLD_USD:
        await check_and_alert_costs(user_id, daily["total_cost_usd"])

    return result

# Reporte mensual automatizado
async def generate_monthly_report():
    now = datetime.now()
    report = tracker.monthly_report(now.year, now.month)
    print(f"\n=== Reporte {report['period']} ===")
    print(f"Costo total: ${report['total_cost_usd']:.4f}")
    print(f"Queries totales: {report['total_queries']}")
    print("Por modelo:")
    for model, cost in sorted(report['by_model'].items(), key=lambda x: -x[1]):
        print(f"  {model}: ${cost:.4f}")

Resumen: Checklist de Optimización

graph LR
    subgraph I["Impacto Alto"]
        I1[Selección correcta de modelo]
        I2[Prompt caching activado]
        I3[Herramientas mínimas]
    end

    subgraph M["Impacto Medio"]
        M1[Paralelismo con semaphore]
        M2[Cache de resultados Redis]
        M3[Batch processing]
    end

    subgraph B["Impacto Bajo pero acumulativo"]
        B1[System_prompt compacto]
        B2[Grep antes de Read]
        B3[Compresión de outputs]
    end

    I -->|"Implementar primero"| M
    M -->|"Luego"| B
TécnicaAhorro típicoEsfuerzo
Modelo correcto5-20x costoBajo
Prompt caching50-90% input tokensBajo
ParalelismoTiempo ÷ N workersMedio
Cache de resultados100% si hitMedio
System_prompt compacto10-30% tokensBajo
Grep antes de Read20-40% turnsBajo
Batch processing50-80% queriesMedio

La optimización más impactante es seleccionar el modelo correcto. Un Haiku en lugar de Opus para tareas simples puede reducir el costo hasta 60x. El segundo impacto mayor es el prompt caching, especialmente si tienes un system_prompt largo que se repite.

El siguiente capítulo cubre cómo integrar estos agentes optimizados en frameworks web como FastAPI, Express y Next.js.