Capítulo 13: Manejo de Errores y Resiliencia

Por: Artiko
claudeagent-sdkerroresresilienciaretrycircuit-breaker

Capítulo 13: Manejo de Errores y Resiliencia

Un agente que falla en producción y no se recupera es inútil. Este capítulo convierte agentes frágiles en sistemas robustos que enfrentan errores, reintentan inteligentemente, y cuando fallan, fallan de forma controlada.


1. Tipos de Errores en el SDK

Taxonomía completa de errores

El Claude Code SDK puede lanzar varios tipos de errores, cada uno con estrategias de recuperación distintas. Entender la naturaleza de cada error es el primer paso para manejarlos correctamente.

graph TD
    A[Error en el SDK] --> B{¿Tipo?}
    B -->|CLINotFoundError| C[Claude Code no instalado]
    B -->|CLIConnectionError| D[Fallo al conectar]
    B -->|ProcessError| E[Proceso terminó mal]
    B -->|API Error| F{¿Código HTTP?}
    B -->|TimeoutError| G[Tiempo agotado]
    B -->|ToolError| H[Herramienta falló]

    C --> C1[Fatal: instalar Claude Code]
    D --> D1[Reintentable con backoff]
    E --> E1{¿Exit code?}
    E1 -->|1| E2[Error genérico - revisar logs]
    E1 -->|2| E3[Uso incorrecto - fatal]
    E1 -->|130| E4[SIGINT - cancelación limpia]

    F -->|429| F1[Rate limit - backoff exponencial]
    F -->|500/503| F2[Error servidor - reintentable]
    F -->|400| F3[Bad request - fatal]
    F -->|401| F4[Auth error - revisar API key]

    G --> G1[Cancelar y reintentar con timeout mayor]
    H --> H1[Agente puede auto-recuperarse]

    style C1 fill:#ff6b6b,color:#fff
    style D1 fill:#ffa94d,color:#fff
    style F1 fill:#ffa94d,color:#fff
    style F3 fill:#ff6b6b,color:#fff
    style F4 fill:#ff6b6b,color:#fff

CLINotFoundError

Ocurre cuando el binario de claude no está instalado o no está en el PATH:

from claude_code_sdk import query, ClaudeCodeOptions
from claude_code_sdk.errors import CLINotFoundError


async def safe_query_with_install_check(prompt: str, cwd: str):
    try:
        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(cwd=cwd)
        ):
            yield message

    except CLINotFoundError as e:
        print(f"ERROR FATAL: Claude Code no está instalado.")
        print(f"Instalar con: npm install -g @anthropic-ai/claude-code")
        print(f"Detalle: {e}")
        raise SystemExit(1)  # No tiene sentido reintentar


def verify_cli_available() -> bool:
    """Verifica que el CLI esté disponible antes de intentar usar el SDK."""
    import subprocess
    try:
        result = subprocess.run(
            ["claude", "--version"],
            capture_output=True, text=True, timeout=5
        )
        return result.returncode == 0
    except (FileNotFoundError, subprocess.TimeoutExpired):
        return False

CLIConnectionError

Ocurre cuando el SDK no puede conectarse al proceso de Claude Code. Generalmente transitorio:

from claude_code_sdk.errors import CLIConnectionError
import asyncio


async def query_with_connection_retry(prompt: str, cwd: str, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            async for message in query(
                prompt=prompt,
                options=ClaudeCodeOptions(cwd=cwd)
            ):
                yield message
            return  # Éxito

        except CLIConnectionError as e:
            if attempt == max_retries - 1:
                raise  # Último intento, propagar error

            wait = 2 ** attempt  # Backoff exponencial: 1s, 2s, 4s
            print(f"Error de conexión (intento {attempt + 1}/{max_retries}). Reintentando en {wait}s...")
            await asyncio.sleep(wait)

ProcessError

El proceso de Claude Code terminó con código de salida no cero:

from claude_code_sdk.errors import ProcessError


async def handle_process_errors(prompt: str, cwd: str):
    """Maneja ProcessError con diagnóstico detallado."""
    try:
        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(cwd=cwd)
        ):
            yield message

    except ProcessError as e:
        exit_code = getattr(e, "exit_code", -1)
        stderr = getattr(e, "stderr", "")

        if exit_code == 130:
            # SIGINT - cancelación normal
            print("Agente cancelado por señal del usuario")
            return

        if exit_code == 1:
            # Error genérico del agente
            print(f"El agente terminó con error. Stderr: {stderr}")
            if "rate limit" in stderr.lower():
                raise RateLimitError("Rate limit alcanzado") from e
            raise

        if exit_code == 2:
            # Uso incorrecto - error fatal, no reintentable
            print(f"Error de configuración: {stderr}")
            raise


class RateLimitError(Exception):
    """Error específico de rate limit de Anthropic."""
    pass

APIError y rate limiting

from claude_code_sdk.errors import APIError
import time


async def query_with_api_error_handling(prompt: str, cwd: str):
    """Maneja errores de API incluyendo rate limits."""
    try:
        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(cwd=cwd)
        ):
            yield message

    except APIError as e:
        status_code = getattr(e, "status_code", 0)

        if status_code == 429:
            # Rate limit - esperar y reintentar
            retry_after = getattr(e, "retry_after", 60)
            print(f"Rate limit alcanzado. Esperando {retry_after}s...")
            await asyncio.sleep(retry_after)
            # Reintentar recursivamente (con cuidado de infinitos loops)
            async for message in query_with_api_error_handling(prompt, cwd):
                yield message

        elif status_code in (500, 502, 503, 504):
            # Errores del servidor - reintentables
            print(f"Error del servidor ({status_code}). El servicio puede estar degradado.")
            raise

        elif status_code == 401:
            # Autenticación - fatal
            print("ERROR: API key inválida o expirada")
            raise SystemExit(1)

        elif status_code == 400:
            # Bad request - error en el código del cliente
            print(f"Error en la petición: {e}")
            raise

        else:
            raise

2. Retry con Backoff Exponencial

Implementación desde cero

import asyncio
import random
import time
from dataclasses import dataclass
from typing import Callable, Optional, TypeVar
from functools import wraps

T = TypeVar("T")


@dataclass
class RetryConfig:
    max_retries: int = 3
    initial_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0
    jitter: bool = True
    retryable_exceptions: tuple = (Exception,)


def calculate_delay(attempt: int, config: RetryConfig) -> float:
    """Calcula el delay con backoff exponencial y jitter opcional."""
    delay = min(
        config.initial_delay * (config.exponential_base ** attempt),
        config.max_delay
    )
    if config.jitter:
        # Jitter ±20% para evitar el "thundering herd"
        jitter_amount = delay * 0.2
        delay += random.uniform(-jitter_amount, jitter_amount)
    return max(0, delay)


async def retry_async(
    func: Callable,
    config: RetryConfig,
    *args,
    on_retry: Optional[Callable] = None,
    **kwargs
):
    """Ejecuta una función async con retry automático."""
    last_exception = None

    for attempt in range(config.max_retries + 1):
        try:
            return await func(*args, **kwargs)

        except config.retryable_exceptions as e:
            last_exception = e

            if attempt == config.max_retries:
                print(f"Fallando después de {config.max_retries + 1} intentos")
                raise

            delay = calculate_delay(attempt, config)

            if on_retry:
                on_retry(attempt + 1, e, delay)

            print(f"Intento {attempt + 1} falló: {e}. Reintentando en {delay:.1f}s...")
            await asyncio.sleep(delay)

    raise last_exception


# Versión para agentes específicamente
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
from claude_code_sdk.errors import CLIConnectionError, APIError


AGENT_RETRY_CONFIG = RetryConfig(
    max_retries=3,
    initial_delay=2.0,
    max_delay=120.0,
    exponential_base=2.0,
    jitter=True,
    retryable_exceptions=(CLIConnectionError, APIError, ConnectionError)
)


async def resilient_query(prompt: str, cwd: str) -> list:
    """Wrapper de query() con retry incorporado."""
    messages = []

    async def _run():
        nonlocal messages
        messages = []
        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(cwd=cwd)
        ):
            messages.append(message)
        return messages

    return await retry_async(
        _run,
        AGENT_RETRY_CONFIG,
        on_retry=lambda attempt, err, delay: print(
            f"[Retry {attempt}] Error: {err} - esperando {delay:.1f}s"
        )
    )

Usando tenacity (librería Python)

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    before_sleep_log,
    after_log,
)
import logging

logger = logging.getLogger(__name__)


@retry(
    stop=stop_after_attempt(4),
    wait=wait_exponential(multiplier=1, min=2, max=60),
    retry=retry_if_exception_type((CLIConnectionError, APIError)),
    before_sleep=before_sleep_log(logger, logging.WARNING),
    after=after_log(logger, logging.INFO),
    reraise=True
)
async def query_with_tenacity(prompt: str, cwd: str):
    """Agente con retry automático usando tenacity."""
    messages = []
    async for message in query(
        prompt=prompt,
        options=ClaudeCodeOptions(cwd=cwd)
    ):
        messages.append(message)
    return messages

TypeScript con p-retry

import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
import pRetry, { AbortError } from "p-retry";

interface RetryOptions {
  maxRetries?: number;
  minTimeout?: number;
  maxTimeout?: number;
}

async function resilientQuery(
  prompt: string,
  cwd: string,
  options: RetryOptions = {}
): Promise<unknown[]> {
  const { maxRetries = 3, minTimeout = 2000, maxTimeout = 60000 } = options;

  return pRetry(
    async (attemptNumber) => {
      const messages: unknown[] = [];

      try {
        for await (const message of query({
          prompt,
          options: { cwd } as ClaudeCodeOptions,
        })) {
          messages.push(message);
        }
        return messages;
      } catch (error) {
        const err = error as Error;

        // Errores que NO deben reintentarse
        if (err.message.includes("not installed") || err.message.includes("401")) {
          throw new AbortError(err.message);
        }

        console.warn(`Intento ${attemptNumber} falló: ${err.message}`);
        throw error; // Reintentable
      }
    },
    {
      retries: maxRetries,
      minTimeout,
      maxTimeout,
      factor: 2,
      randomize: true,
      onFailedAttempt: (error) => {
        console.warn(
          `Intento ${error.attemptNumber} de ${error.retriesLeft + error.attemptNumber} falló. ` +
          `Próximo intento en ${error.retryDelay}ms`
        );
      },
    }
  );
}

3. Circuit Breaker Pattern

¿Por qué usar Circuit Breaker?

Si la API de Anthropic está caída, hacer retry infinito solo empeora la situación. El Circuit Breaker “abre el circuito” después de N fallos consecutivos, rechazando nuevas peticiones inmediatamente. Después de un tiempo, “cierra el circuito” para probar si el servicio se recuperó.

stateDiagram-v2
    [*] --> CLOSED

    CLOSED --> OPEN : failures >= threshold
    OPEN --> HALF_OPEN : timeout elapsed
    HALF_OPEN --> CLOSED : success
    HALF_OPEN --> OPEN : failure

    CLOSED : CLOSED\nPeticiones normales\nContando fallos
    OPEN : OPEN\nRechaza todo\nEsperando timeout
    HALF_OPEN : HALF_OPEN\nUna petición de prueba\nDecidiendo estado

Implementación en Python

import asyncio
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Callable, Any


class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"


@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5
    success_threshold: int = 2
    timeout_seconds: float = 60.0
    expected_exception: type = Exception


@dataclass
class CircuitBreakerStats:
    total_requests: int = 0
    successful_requests: int = 0
    failed_requests: int = 0
    rejected_requests: int = 0
    state_changes: list = field(default_factory=list)


class CircuitBreaker:
    def __init__(self, name: str, config: CircuitBreakerConfig):
        self.name = name
        self.config = config
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._success_count = 0
        self._last_failure_time: Optional[float] = None
        self.stats = CircuitBreakerStats()

    @property
    def state(self) -> CircuitState:
        if self._state == CircuitState.OPEN:
            elapsed = time.time() - (self._last_failure_time or 0)
            if elapsed >= self.config.timeout_seconds:
                self._transition_to(CircuitState.HALF_OPEN)
        return self._state

    def _transition_to(self, new_state: CircuitState):
        old_state = self._state
        self._state = new_state
        self.stats.state_changes.append({
            "from": old_state.value,
            "to": new_state.value,
            "time": time.time()
        })
        print(f"[CircuitBreaker:{self.name}] {old_state.value}{new_state.value}")

    def _on_success(self):
        self.stats.successful_requests += 1
        if self._state == CircuitState.HALF_OPEN:
            self._success_count += 1
            if self._success_count >= self.config.success_threshold:
                self._failure_count = 0
                self._success_count = 0
                self._transition_to(CircuitState.CLOSED)
        elif self._state == CircuitState.CLOSED:
            self._failure_count = 0

    def _on_failure(self, exc: Exception):
        self.stats.failed_requests += 1
        self._failure_count += 1
        self._last_failure_time = time.time()

        if self._state == CircuitState.HALF_OPEN:
            self._success_count = 0
            self._transition_to(CircuitState.OPEN)
        elif self._state == CircuitState.CLOSED:
            if self._failure_count >= self.config.failure_threshold:
                self._transition_to(CircuitState.OPEN)

    async def call(self, func: Callable, *args, **kwargs) -> Any:
        self.stats.total_requests += 1

        if self.state == CircuitState.OPEN:
            self.stats.rejected_requests += 1
            raise CircuitOpenError(
                f"CircuitBreaker '{self.name}' está abierto. "
                f"Servicio considerado no disponible."
            )

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result

        except self.config.expected_exception as e:
            self._on_failure(e)
            raise

    def get_metrics(self) -> dict:
        success_rate = (
            self.stats.successful_requests / self.stats.total_requests
            if self.stats.total_requests > 0 else 0
        )
        return {
            "state": self.state.value,
            "failure_count": self._failure_count,
            "total_requests": self.stats.total_requests,
            "success_rate": f"{success_rate:.1%}",
            "rejected_requests": self.stats.rejected_requests,
        }


class CircuitOpenError(Exception):
    """El circuit breaker está abierto."""
    pass


# Instancia global del circuit breaker para el SDK
from claude_code_sdk.errors import CLIConnectionError, APIError

anthropic_circuit_breaker = CircuitBreaker(
    name="anthropic_api",
    config=CircuitBreakerConfig(
        failure_threshold=5,
        success_threshold=2,
        timeout_seconds=120.0,
        expected_exception=(CLIConnectionError, APIError, ConnectionError)
    )
)


async def protected_query(prompt: str, cwd: str):
    """Query protegido por circuit breaker."""
    async def _run():
        messages = []
        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(cwd=cwd)
        ):
            messages.append(message)
        return messages

    try:
        return await anthropic_circuit_breaker.call(_run)
    except CircuitOpenError as e:
        print(f"Servicio no disponible: {e}")
        print(f"Métricas: {anthropic_circuit_breaker.get_metrics()}")
        raise

TypeScript: Circuit Breaker

import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

type CircuitState = "CLOSED" | "OPEN" | "HALF_OPEN";

interface CircuitBreakerOptions {
  failureThreshold: number;
  successThreshold: number;
  timeoutMs: number;
}

class CircuitBreaker {
  private state: CircuitState = "CLOSED";
  private failureCount = 0;
  private successCount = 0;
  private lastFailureTime = 0;

  constructor(
    private readonly name: string,
    private readonly options: CircuitBreakerOptions
  ) {}

  private get currentState(): CircuitState {
    if (this.state === "OPEN") {
      const elapsed = Date.now() - this.lastFailureTime;
      if (elapsed >= this.options.timeoutMs) {
        this.state = "HALF_OPEN";
        console.log(`[CB:${this.name}] OPEN → HALF_OPEN`);
      }
    }
    return this.state;
  }

  async execute<T>(fn: () => Promise<T>): Promise<T> {
    if (this.currentState === "OPEN") {
      throw new Error(`Circuit breaker '${this.name}' está abierto`);
    }

    try {
      const result = await fn();
      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure();
      throw error;
    }
  }

  private onSuccess(): void {
    if (this.state === "HALF_OPEN") {
      this.successCount++;
      if (this.successCount >= this.options.successThreshold) {
        this.state = "CLOSED";
        this.failureCount = 0;
        this.successCount = 0;
        console.log(`[CB:${this.name}] HALF_OPEN → CLOSED`);
      }
    } else {
      this.failureCount = 0;
    }
  }

  private onFailure(): void {
    this.failureCount++;
    this.lastFailureTime = Date.now();

    if (this.state === "HALF_OPEN" || this.failureCount >= this.options.failureThreshold) {
      this.state = "OPEN";
      console.log(`[CB:${this.name}] → OPEN (failures: ${this.failureCount})`);
    }
  }

  getState(): CircuitState {
    return this.currentState;
  }
}

const anthropicCB = new CircuitBreaker("anthropic", {
  failureThreshold: 5,
  successThreshold: 2,
  timeoutMs: 120_000,
});

async function protectedQuery(prompt: string, cwd: string) {
  return anthropicCB.execute(async () => {
    const messages: unknown[] = [];
    for await (const message of query({
      prompt,
      options: { cwd } as ClaudeCodeOptions,
    })) {
      messages.push(message);
    }
    return messages;
  });
}

4. Timeout Handling

asyncio.wait_for en Python

import asyncio
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage


async def query_with_timeout(
    prompt: str,
    cwd: str,
    timeout_seconds: float = 300.0  # 5 minutos por defecto
):
    """Query con timeout total y cleanup automático."""

    async def _run_agent():
        messages = []
        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(cwd=cwd)
        ):
            messages.append(message)
        return messages

    try:
        return await asyncio.wait_for(_run_agent(), timeout=timeout_seconds)

    except asyncio.TimeoutError:
        print(f"Timeout: el agente tardó más de {timeout_seconds}s")
        # Cleanup: deshacer cambios parciales si es necesario
        await cleanup_partial_changes(cwd)
        raise


async def query_with_per_tool_timeout(
    prompt: str,
    cwd: str,
    total_timeout: float = 300.0,
    tool_timeout: float = 30.0
):
    """Timeout por herramienta individual."""
    messages = []
    last_message_time = asyncio.get_event_loop().time()

    async def _generator():
        nonlocal last_message_time
        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(cwd=cwd)
        ):
            last_message_time = asyncio.get_event_loop().time()
            yield message

    async def _monitor():
        """Cancela si no hay actividad por tool_timeout segundos."""
        while True:
            await asyncio.sleep(5)
            elapsed = asyncio.get_event_loop().time() - last_message_time
            if elapsed > tool_timeout:
                raise asyncio.TimeoutError(
                    f"Sin actividad por {tool_timeout}s - posible herramienta colgada"
                )

    try:
        async with asyncio.timeout(total_timeout):
            monitor_task = asyncio.create_task(_monitor())
            async for message in _generator():
                messages.append(message)
            monitor_task.cancel()

    except asyncio.TimeoutError as e:
        print(f"Timeout: {e}")
        raise

    return messages


async def cleanup_partial_changes(cwd: str):
    """Revierte cambios parciales si el agente fue interrumpido."""
    import subprocess
    from pathlib import Path

    git_dir = Path(cwd) / ".git"
    if git_dir.exists():
        try:
            # Guardar cambios parciales en stash
            result = subprocess.run(
                ["git", "stash", "push", "-m", "agent-partial-changes-timeout"],
                cwd=cwd,
                capture_output=True,
                text=True,
                timeout=10
            )
            if result.returncode == 0:
                print(f"Cambios parciales guardados en git stash")
        except subprocess.TimeoutExpired:
            print("No se pudo hacer git stash")

AbortController en TypeScript

import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

async function queryWithTimeout(
  prompt: string,
  cwd: string,
  timeoutMs: number = 300_000
): Promise<unknown[]> {
  const controller = new AbortController();
  const messages: unknown[] = [];

  // Auto-cancelar por timeout
  const timeoutId = setTimeout(() => {
    controller.abort(new Error(`Timeout después de ${timeoutMs}ms`));
  }, timeoutMs);

  try {
    for await (const message of query({
      prompt,
      options: { cwd } as ClaudeCodeOptions,
    })) {
      if (controller.signal.aborted) {
        throw new Error("Agente cancelado por timeout");
      }
      messages.push(message);
    }

    return messages;
  } catch (error) {
    if (controller.signal.aborted) {
      console.error(`Timeout en agente: ${timeoutMs}ms`);
      await cleanupPartialChanges(cwd);
    }
    throw error;
  } finally {
    clearTimeout(timeoutId);
  }
}

async function cleanupPartialChanges(cwd: string): Promise<void> {
  const { execSync } = await import("child_process");
  try {
    execSync("git stash push -m agent-timeout-cleanup", {
      cwd,
      stdio: "pipe",
      timeout: 10_000,
    });
    console.log("Cambios parciales guardados en stash");
  } catch {
    console.warn("No se pudo hacer git stash");
  }
}

5. Manejo de Errores en Herramientas

Qué pasa cuando Bash devuelve exit_code != 0

El agente de Claude puede recuperarse de muchos errores de herramientas por sí solo, pero necesitas ayuda para detectar cuándo no lo hace:

from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage


async def monitor_tool_errors(prompt: str, cwd: str):
    """Monitorea errores de herramientas durante la ejecución."""
    tool_errors = []
    consecutive_errors = 0
    MAX_CONSECUTIVE_ERRORS = 3

    async for message in query(
        prompt=prompt,
        options=ClaudeCodeOptions(cwd=cwd)
    ):
        if isinstance(message, AssistantMessage):
            for block in message.content:
                if not hasattr(block, "name"):
                    continue

                tool_name = block.name
                tool_input = getattr(block, "input", {})

                # Detectar patrones de error en el input
                if tool_name == "Bash":
                    cmd = tool_input.get("command", "")
                    if is_dangerous_command(cmd):
                        print(f"⚠️  Comando potencialmente peligroso: {cmd}")

        elif isinstance(message, ResultMessage):
            if message.subtype == "error":
                print(f"El agente reportó error al completar la tarea")

        yield message


def is_dangerous_command(cmd: str) -> bool:
    """Detecta comandos que podrían ser problemáticos."""
    dangerous_patterns = [
        "rm -rf /",
        "dd if=/dev/zero",
        "mkfs",
        "> /dev/sda",
        ":(){ :|:& };:"  # Fork bomb
    ]
    cmd_lower = cmd.lower()
    return any(p in cmd_lower for p in dangerous_patterns)

PostToolUse hook para detectar errores

from claude_code_sdk import ClaudeCodeOptions
from typing import Optional


def create_error_detecting_hook(on_error: Optional[callable] = None):
    """Hook que detecta errores en herramientas y notifica."""

    error_keywords = [
        "error", "failed", "exception", "traceback",
        "no such file", "permission denied", "command not found",
        "syntax error", "import error"
    ]

    def post_tool_use(tool_name: str, tool_input: dict, tool_output: str) -> None:
        output_lower = tool_output.lower()
        is_error = any(kw in output_lower for kw in error_keywords)

        if is_error:
            error_info = {
                "tool": tool_name,
                "input": tool_input,
                "output_preview": tool_output[:200],
            }

            if on_error:
                on_error(error_info)
            else:
                print(f"⚠️  Error detectado en {tool_name}: {tool_output[:100]}")

    return post_tool_use


# Uso con el SDK
def build_options_with_error_detection(cwd: str) -> ClaudeCodeOptions:
    errors_log = []

    def log_error(error_info: dict):
        errors_log.append(error_info)
        print(f"Tool error logged: {error_info['tool']}")

    return ClaudeCodeOptions(
        cwd=cwd,
        post_tool_use_hook=create_error_detecting_hook(on_error=log_error)
    )

6. Rollback y Compensación

Backup automático antes de editar

import shutil
from pathlib import Path
from datetime import datetime
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage


class AgentWithRollback:
    """Agente que hace backup automático y puede hacer rollback."""

    def __init__(self, project_dir: str):
        self.project_dir = Path(project_dir)
        self.backup_dir: Optional[Path] = None

    def create_backup(self) -> Path:
        """Crea backup del directorio del proyecto."""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_path = Path(f"/tmp/agent_backup_{timestamp}")
        shutil.copytree(self.project_dir, backup_path)
        self.backup_dir = backup_path
        print(f"Backup creado en: {backup_path}")
        return backup_path

    def rollback(self):
        """Restaura el proyecto desde el backup."""
        if not self.backup_dir or not self.backup_dir.exists():
            raise ValueError("No hay backup disponible para rollback")

        # Borrar estado actual
        shutil.rmtree(self.project_dir)
        # Restaurar backup
        shutil.copytree(self.backup_dir, self.project_dir)
        print(f"Rollback completado desde: {self.backup_dir}")

    def cleanup_backup(self):
        """Elimina el backup (llamar cuando el agente termina exitosamente)."""
        if self.backup_dir and self.backup_dir.exists():
            shutil.rmtree(self.backup_dir)
            print(f"Backup limpiado: {self.backup_dir}")
            self.backup_dir = None

    async def run_with_rollback(self, prompt: str) -> list:
        """Ejecuta el agente con rollback automático en caso de error."""
        self.create_backup()
        messages = []
        success = False

        try:
            async for message in query(
                prompt=prompt,
                options=ClaudeCodeOptions(cwd=str(self.project_dir))
            ):
                messages.append(message)

            # Verificar que el proyecto sigue siendo válido
            if not self._validate_project():
                raise ValueError("El proyecto quedó en estado inválido")

            success = True
            return messages

        except Exception as e:
            print(f"Error durante ejecución del agente: {e}")
            print("Iniciando rollback...")
            self.rollback()
            raise

        finally:
            if success:
                self.cleanup_backup()

    def _validate_project(self) -> bool:
        """Verifica que el proyecto esté en estado válido."""
        # Ejemplo: verificar que los tests siguen pasando
        import subprocess
        result = subprocess.run(
            ["python", "-m", "pytest", "--tb=no", "-q"],
            cwd=self.project_dir,
            capture_output=True,
            timeout=60
        )
        return result.returncode == 0

Git rollback transaccional

import subprocess
from pathlib import Path


class GitTransactionalAgent:
    """Agente con transaccionalidad usando git."""

    def __init__(self, project_dir: str):
        self.cwd = project_dir
        self._stash_ref: Optional[str] = None

    def _git(self, *args, check=True) -> str:
        result = subprocess.run(
            ["git", *args],
            cwd=self.cwd,
            capture_output=True,
            text=True,
            timeout=30
        )
        if check and result.returncode != 0:
            raise subprocess.CalledProcessError(
                result.returncode, ["git", *args], result.stderr
            )
        return result.stdout.strip()

    def begin_transaction(self) -> str:
        """Crea un commit de checkpoint para poder volver."""
        # Guardar estado actual
        self._git("add", "-A")
        try:
            self._git("commit", "-m", "agent-transaction-checkpoint [skip ci]")
            checkpoint = self._git("rev-parse", "HEAD")
            print(f"Checkpoint creado: {checkpoint[:8]}")
            return checkpoint
        except subprocess.CalledProcessError:
            # No hay cambios que commitear
            checkpoint = self._git("rev-parse", "HEAD")
            return checkpoint

    def rollback_to_checkpoint(self, checkpoint: str):
        """Revierte al checkpoint especificado."""
        # Deshacer commits posteriores al checkpoint
        self._git("reset", "--hard", checkpoint)
        print(f"Rollback a checkpoint: {checkpoint[:8]}")

    def commit_transaction(self, message: str = "agent: applied changes"):
        """Squash commits del agente en uno solo."""
        self._git("add", "-A")
        try:
            self._git("commit", "-m", message)
        except subprocess.CalledProcessError:
            pass  # Nada que commitear

    async def run_transactional(self, prompt: str, commit_message: str) -> list:
        from claude_code_sdk import query, ClaudeCodeOptions

        checkpoint = self.begin_transaction()
        messages = []

        try:
            async for message in query(
                prompt=prompt,
                options=ClaudeCodeOptions(cwd=self.cwd)
            ):
                messages.append(message)

            self.commit_transaction(commit_message)
            return messages

        except Exception as e:
            print(f"Error: {e}. Haciendo rollback...")
            self.rollback_to_checkpoint(checkpoint)
            raise

7. Human-in-the-Loop para Errores

Cuándo escalar al humano

flowchart TD
    A[Error en agente] --> B{¿Es recuperable?}
    B -->|Sí, reintentable| C[Retry automático]
    B -->|No sé| D{¿Intentos agotados?}
    B -->|No, fatal| E[Escalar a humano]

    C --> F{¿Éxito?}
    F -->|Sí| G[Continuar]
    F -->|No, max retries| D

    D -->|Sí| E
    D -->|No| C

    E --> H[AskUserQuestion]
    H --> I{¿Respuesta humana?}
    I -->|Continuar diferente| J[Reintentar con feedback]
    I -->|Abandonar| K[Terminar gracefully]
    I -->|Timeout| L[Fallback automático]

    style E fill:#ffa94d,color:#fff
    style K fill:#ff6b6b,color:#fff

Patrón de escalación

import asyncio
from typing import Optional, Callable


class HumanInTheLoopHandler:
    """Maneja la escalación de errores al humano."""

    def __init__(
        self,
        question_timeout_seconds: float = 120.0,
        ask_question: Optional[Callable] = None
    ):
        self.timeout = question_timeout_seconds
        self._ask_question = ask_question or self._default_ask

    async def _default_ask(self, question: str) -> str:
        """Pregunta al usuario en la terminal."""
        print(f"\n{'='*50}")
        print(f"⚠️  El agente necesita tu ayuda:")
        print(question)
        print(f"{'='*50}")
        print("Responde (o presiona Enter para continuar con defaults):")

        loop = asyncio.get_event_loop()
        try:
            return await asyncio.wait_for(
                loop.run_in_executor(None, input, "> "),
                timeout=self.timeout
            )
        except asyncio.TimeoutError:
            print(f"\n[Timeout después de {self.timeout}s, usando respuesta por defecto]")
            return ""

    async def handle_error(
        self,
        error: Exception,
        context: dict,
        retry_callback: Callable
    ):
        """Escala un error al humano y decide cómo continuar."""
        question = self._format_error_question(error, context)
        response = await self._ask_question(question)

        if not response or response.lower() in ("", "continuar", "y", "yes"):
            # Reintentar con configuración modificada si el usuario dio instrucciones
            return await retry_callback(user_guidance=response)

        elif response.lower() in ("abandonar", "n", "no", "abort"):
            raise SystemExit("Tarea abandonada por el usuario")

        else:
            # El usuario dio instrucciones específicas
            return await retry_callback(user_guidance=response)

    def _format_error_question(self, error: Exception, context: dict) -> str:
        return f"""
Error encontrado: {type(error).__name__}: {error}

Contexto:
- Tarea: {context.get('prompt', 'N/A')}
- Directorio: {context.get('cwd', 'N/A')}
- Intentos realizados: {context.get('attempts', 0)}

¿Qué quieres hacer?
- Presiona Enter para reintentar
- Escribe instrucciones adicionales para el agente
- Escribe 'abandonar' para cancelar
"""


async def agent_with_hitl(prompt: str, cwd: str):
    """Agente que escala errores al humano."""
    from claude_code_sdk import query, ClaudeCodeOptions
    from claude_code_sdk.errors import CLIConnectionError

    hitl = HumanInTheLoopHandler(question_timeout_seconds=60.0)
    attempts = 0
    max_human_escalations = 2

    async def run_agent(user_guidance: str = ""):
        nonlocal attempts
        attempts += 1

        full_prompt = prompt
        if user_guidance:
            full_prompt = f"{prompt}\n\nNota adicional del usuario: {user_guidance}"

        messages = []
        async for message in query(
            prompt=full_prompt,
            options=ClaudeCodeOptions(cwd=cwd)
        ):
            messages.append(message)
        return messages

    for escalation in range(max_human_escalations + 1):
        try:
            return await run_agent()
        except (CLIConnectionError, TimeoutError) as e:
            if escalation >= max_human_escalations:
                print("Máximo de escalaciones alcanzado. Abortando.")
                raise

            await hitl.handle_error(
                error=e,
                context={"prompt": prompt, "cwd": cwd, "attempts": attempts},
                retry_callback=run_agent
            )

8. Logging Estructurado

Python: structlog

# logging_config.py
import structlog
import logging
from datetime import datetime


def setup_logging(log_level: str = "INFO", log_file: str = None):
    """Configura logging estructurado para el agente."""
    processors = [
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.UnicodeDecoder(),
        structlog.processors.JSONRenderer()
    ]

    structlog.configure(
        processors=processors,
        wrapper_class=structlog.stdlib.BoundLogger,
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        cache_logger_on_first_use=True,
    )

    # Configurar stdlib logging
    logging.basicConfig(
        format="%(message)s",
        level=getattr(logging, log_level.upper()),
        handlers=[
            logging.StreamHandler(),
            *([] if not log_file else [logging.FileHandler(log_file)])
        ]
    )


# agent_logger.py
import structlog
import uuid
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage

logger = structlog.get_logger(__name__)


async def logged_agent_run(prompt: str, cwd: str):
    """Agente con logging estructurado completo."""
    session_id = str(uuid.uuid4())[:8]

    log = logger.bind(
        session_id=session_id,
        cwd=cwd,
        prompt_length=len(prompt)
    )

    log.info("agent_start", prompt_preview=prompt[:100])

    tool_events = []
    start_time = __import__("time").time()

    try:
        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(cwd=cwd)
        ):
            if isinstance(message, AssistantMessage):
                for block in message.content:
                    if hasattr(block, "name"):
                        tool_event = {
                            "tool": block.name,
                            "input_keys": list(getattr(block, "input", {}).keys())
                        }
                        tool_events.append(tool_event)
                        log.info("tool_use", **tool_event)

            elif isinstance(message, ResultMessage):
                duration = __import__("time").time() - start_time
                log.info(
                    "agent_complete",
                    subtype=message.subtype,
                    duration_ms=message.duration_ms,
                    cost_usd=message.cost_usd,
                    num_tools=len(tool_events),
                    actual_duration_s=f"{duration:.2f}"
                )

    except Exception as e:
        duration = __import__("time").time() - start_time
        log.error(
            "agent_error",
            error_type=type(e).__name__,
            error_message=str(e),
            duration_s=f"{duration:.2f}",
            num_tools_before_error=len(tool_events),
            exc_info=True
        )
        raise

TypeScript: pino

import { query, ClaudeCodeOptions, AssistantMessage, ResultMessage } from "@anthropic-ai/claude-code-sdk";
import pino from "pino";
import { randomUUID } from "crypto";

const logger = pino({
  level: process.env.LOG_LEVEL ?? "info",
  transport: {
    target: "pino-pretty",
    options: {
      colorize: true,
      translateTime: "SYS:standard",
    },
  },
});

async function loggedAgentRun(prompt: string, cwd: string): Promise<unknown[]> {
  const sessionId = randomUUID().slice(0, 8);
  const log = logger.child({ sessionId, cwd });
  const messages: unknown[] = [];
  const toolEvents: { tool: string; timestamp: number }[] = [];
  const startTime = Date.now();

  log.info({ promptLength: prompt.length, promptPreview: prompt.slice(0, 100) }, "agent_start");

  try {
    for await (const message of query({
      prompt,
      options: { cwd } as ClaudeCodeOptions,
    })) {
      messages.push(message);

      if (message.type === "assistant") {
        const assistantMsg = message as AssistantMessage;
        for (const block of assistantMsg.message.content) {
          if (block.type === "tool_use") {
            toolEvents.push({ tool: block.name, timestamp: Date.now() });
            log.info({ tool: block.name, inputKeys: Object.keys(block.input) }, "tool_use");
          }
        }
      } else if (message.type === "result") {
        const resultMsg = message as ResultMessage;
        log.info(
          {
            subtype: resultMsg.subtype,
            durationMs: resultMsg.duration_ms,
            costUsd: resultMsg.total_cost_usd,
            numTools: toolEvents.length,
            wallTimeMs: Date.now() - startTime,
          },
          "agent_complete"
        );
      }
    }

    return messages;
  } catch (error) {
    log.error(
      {
        errorType: (error as Error).constructor.name,
        errorMessage: (error as Error).message,
        wallTimeMs: Date.now() - startTime,
        numToolsBeforeError: toolEvents.length,
      },
      "agent_error"
    );
    throw error;
  }
}

9. Observabilidad de Errores

Sentry integration

import sentry_sdk
from sentry_sdk.integrations.asyncio import AsyncioIntegration
from claude_code_sdk import query, ClaudeCodeOptions, ResultMessage
from claude_code_sdk.errors import CLIConnectionError, APIError


def init_sentry(dsn: str, environment: str = "production"):
    """Inicializa Sentry para captura de errores del agente."""
    sentry_sdk.init(
        dsn=dsn,
        environment=environment,
        integrations=[AsyncioIntegration()],
        traces_sample_rate=0.1,
        profiles_sample_rate=0.1,
    )


async def monitored_agent_run(prompt: str, cwd: str, user_id: str = None):
    """Agente con monitoreo completo de errores."""
    with sentry_sdk.new_scope() as scope:
        scope.set_tag("agent.type", "claude-code")
        scope.set_tag("environment", "production")
        if user_id:
            scope.set_user({"id": user_id})

        scope.set_context("agent", {
            "cwd": cwd,
            "prompt_length": len(prompt),
        })

        with sentry_sdk.start_transaction(
            op="agent.run",
            name=f"Claude Agent: {prompt[:50]}"
        ) as transaction:
            try:
                messages = []
                async for message in query(
                    prompt=prompt,
                    options=ClaudeCodeOptions(cwd=cwd)
                ):
                    messages.append(message)
                    if isinstance(message, ResultMessage):
                        transaction.set_measurement("cost_usd", message.cost_usd or 0)
                        transaction.set_measurement("duration_ms", message.duration_ms)

                transaction.set_status("ok")
                return messages

            except CLIConnectionError as e:
                sentry_sdk.capture_exception(e)
                transaction.set_status("internal_error")
                raise

            except APIError as e:
                # Capturar con contexto adicional
                sentry_sdk.capture_exception(e, scope=scope)
                transaction.set_status("unavailable")
                raise

            except Exception as e:
                sentry_sdk.capture_exception(e)
                transaction.set_status("unknown_error")
                raise


def track_error_rate(error: Exception, metric_name: str = "agent.error_rate"):
    """Registra errores en sistema de métricas."""
    error_type = type(error).__name__
    # Aquí conectarías con tu sistema de métricas (Datadog, Prometheus, etc.)
    print(f"METRIC: {metric_name}{{error_type={error_type}}} += 1")

10. Graceful Degradation

Fallback a modelo más barato

from claude_code_sdk import query, ClaudeCodeOptions
from claude_code_sdk.errors import APIError
from typing import Optional


MODELS = [
    "claude-opus-4-5",   # Mejor, más caro
    "claude-sonnet-4-5", # Balanceado
    "claude-haiku-4-5",  # Más barato, más rápido
]


async def query_with_model_fallback(
    prompt: str,
    cwd: str,
    preferred_model: str = "claude-opus-4-5"
):
    """Intenta con el modelo preferido, hace fallback si falla."""
    start_idx = MODELS.index(preferred_model) if preferred_model in MODELS else 0
    models_to_try = MODELS[start_idx:]

    last_error = None

    for model in models_to_try:
        try:
            print(f"Intentando con modelo: {model}")
            messages = []
            async for message in query(
                prompt=prompt,
                options=ClaudeCodeOptions(
                    cwd=cwd,
                    model=model
                )
            ):
                messages.append(message)

            if model != preferred_model:
                print(f"⚠️  Usando modelo de fallback: {model}")

            return messages, model

        except APIError as e:
            status = getattr(e, "status_code", 0)
            if status in (429, 503):
                # Rate limit o servicio degradado - probar siguiente modelo
                print(f"Modelo {model} no disponible ({status}), probando siguiente...")
                last_error = e
                continue
            else:
                raise  # Otros errores no son de modelo, propagar

    raise last_error or RuntimeError("Todos los modelos fallaron")


async def query_with_cache_fallback(
    prompt: str,
    cwd: str,
    cache_key: str,
    cache: dict
):
    """Usa respuesta cacheada si la API falla."""
    try:
        messages = []
        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(cwd=cwd)
        ):
            messages.append(message)

        # Guardar en cache si fue exitoso
        cache[cache_key] = messages
        return messages

    except Exception as e:
        # Verificar si hay respuesta cacheada
        if cache_key in cache:
            print(f"⚠️  API falló ({e}), usando respuesta cacheada")
            return cache[cache_key]
        raise

11. Ejemplo Completo: Agente Ultra-Resiliente

Este ejemplo combina todos los patrones vistos en el capítulo:

# resilient_agent.py
import asyncio
import time
import uuid
import structlog
from dataclasses import dataclass, field
from typing import Optional, Callable
from pathlib import Path

from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
from claude_code_sdk.errors import CLINotFoundError, CLIConnectionError, APIError

logger = structlog.get_logger(__name__)


@dataclass
class ResilientAgentConfig:
    # Retry
    max_retries: int = 3
    initial_delay: float = 2.0
    max_delay: float = 120.0

    # Timeout
    total_timeout_seconds: float = 600.0
    tool_inactivity_timeout: float = 60.0

    # Circuit Breaker
    circuit_failure_threshold: int = 5
    circuit_timeout_seconds: float = 120.0

    # Rollback
    enable_git_rollback: bool = True

    # HITL
    enable_human_escalation: bool = True
    human_response_timeout: float = 120.0

    # Modelos de fallback
    models: list = field(default_factory=lambda: [
        "claude-opus-4-5",
        "claude-sonnet-4-5",
        "claude-haiku-4-5"
    ])


class ResilientAgent:
    def __init__(self, config: ResilientAgentConfig = None):
        self.config = config or ResilientAgentConfig()
        self._circuit_state = "CLOSED"
        self._circuit_failures = 0
        self._circuit_last_failure: Optional[float] = None

    def _check_circuit(self):
        if self._circuit_state == "OPEN":
            elapsed = time.time() - (self._circuit_last_failure or 0)
            if elapsed >= self.config.circuit_timeout_seconds:
                self._circuit_state = "HALF_OPEN"
                logger.info("circuit_half_open")
            else:
                remaining = self.config.circuit_timeout_seconds - elapsed
                raise RuntimeError(
                    f"Circuit breaker abierto. Reintenta en {remaining:.0f}s"
                )

    def _on_circuit_success(self):
        if self._circuit_state == "HALF_OPEN":
            self._circuit_state = "CLOSED"
            self._circuit_failures = 0
            logger.info("circuit_closed")

    def _on_circuit_failure(self, error: Exception):
        self._circuit_failures += 1
        self._circuit_last_failure = time.time()
        if self._circuit_failures >= self.config.circuit_failure_threshold:
            if self._circuit_state != "OPEN":
                self._circuit_state = "OPEN"
                logger.warning("circuit_opened", failures=self._circuit_failures)

    async def _git_checkpoint(self, cwd: str) -> Optional[str]:
        if not self.config.enable_git_rollback:
            return None
        import subprocess
        try:
            result = subprocess.run(
                ["git", "rev-parse", "HEAD"],
                cwd=cwd, capture_output=True, text=True, timeout=10
            )
            if result.returncode == 0:
                checkpoint = result.stdout.strip()
                logger.info("git_checkpoint_created", commit=checkpoint[:8])
                return checkpoint
        except Exception as e:
            logger.warning("git_checkpoint_failed", error=str(e))
        return None

    async def _git_rollback(self, cwd: str, checkpoint: str):
        if not checkpoint:
            return
        import subprocess
        try:
            subprocess.run(
                ["git", "reset", "--hard", checkpoint],
                cwd=cwd, capture_output=True, text=True, timeout=30, check=True
            )
            logger.info("git_rollback_complete", checkpoint=checkpoint[:8])
        except Exception as e:
            logger.error("git_rollback_failed", error=str(e))

    async def _ask_human(self, question: str) -> str:
        if not self.config.enable_human_escalation:
            return ""
        try:
            loop = asyncio.get_event_loop()
            print(f"\n🤔 Agente necesita ayuda:\n{question}\n> ", end="", flush=True)
            response = await asyncio.wait_for(
                loop.run_in_executor(None, input, ""),
                timeout=self.config.human_response_timeout
            )
            return response.strip()
        except asyncio.TimeoutError:
            logger.info("human_escalation_timeout")
            return ""

    async def run(
        self,
        prompt: str,
        cwd: str,
        on_message: Optional[Callable] = None
    ) -> list:
        session_id = str(uuid.uuid4())[:8]
        log = logger.bind(session_id=session_id, cwd=cwd)
        log.info("resilient_agent_start")

        # 1. Verificar CLI disponible
        import subprocess
        try:
            subprocess.run(["claude", "--version"], capture_output=True, timeout=5, check=True)
        except (FileNotFoundError, subprocess.CalledProcessError):
            log.error("cli_not_found")
            raise CLINotFoundError("Claude Code CLI no está instalado")

        # 2. Crear checkpoint git
        checkpoint = await self._git_checkpoint(cwd)

        # 3. Intentar con retry + circuit breaker + timeout
        last_error = None
        model_idx = 0

        for attempt in range(self.config.max_retries + 1):
            try:
                self._check_circuit()

                model = self.config.models[min(model_idx, len(self.config.models) - 1)]
                if model_idx > 0:
                    log.warning("using_fallback_model", model=model, attempt=attempt)

                messages = []

                async def _run_with_timeout():
                    async with asyncio.timeout(self.config.total_timeout_seconds):
                        async for message in query(
                            prompt=prompt,
                            options=ClaudeCodeOptions(cwd=cwd, model=model)
                        ):
                            messages.append(message)
                            if on_message:
                                await on_message(message)

                await _run_with_timeout()

                self._on_circuit_success()
                log.info("resilient_agent_success", attempts=attempt + 1, model=model)
                return messages

            except CLINotFoundError:
                log.error("cli_not_found_during_run")
                raise  # Fatal, no reintentable

            except asyncio.TimeoutError as e:
                last_error = e
                log.warning("timeout", attempt=attempt, timeout=self.config.total_timeout_seconds)
                self._on_circuit_failure(e)
                if checkpoint:
                    await self._git_rollback(cwd, checkpoint)

            except APIError as e:
                last_error = e
                status = getattr(e, "status_code", 0)
                self._on_circuit_failure(e)

                if status == 429:
                    retry_after = getattr(e, "retry_after", 60)
                    log.warning("rate_limit", retry_after=retry_after)
                    await asyncio.sleep(retry_after)
                    continue

                elif status in (500, 503):
                    model_idx += 1  # Probar modelo alternativo

                elif status in (400, 401):
                    log.error("api_fatal_error", status=status)
                    raise

            except (CLIConnectionError, ConnectionError) as e:
                last_error = e
                self._on_circuit_failure(e)
                log.warning("connection_error", error=str(e), attempt=attempt)

            # Delay con backoff exponencial
            if attempt < self.config.max_retries:
                delay = min(
                    self.config.initial_delay * (2 ** attempt),
                    self.config.max_delay
                )
                log.info("retry_delay", delay=delay, next_attempt=attempt + 2)
                await asyncio.sleep(delay)

        # Todos los intentos agotados - escalar al humano
        if self.config.enable_human_escalation and last_error:
            response = await self._ask_human(
                f"El agente falló después de {self.config.max_retries + 1} intentos.\n"
                f"Último error: {last_error}\n\n"
                f"¿Quieres intentar de nuevo con instrucciones adicionales? (o Enter para abandonar)"
            )
            if response:
                return await self.run(
                    prompt=f"{prompt}\n\nInstrucciones adicionales: {response}",
                    cwd=cwd,
                    on_message=on_message
                )

        log.error("resilient_agent_failed", max_retries=self.config.max_retries)
        raise last_error or RuntimeError("Agente falló sin error capturado")


# TypeScript: versión simplificada del agente resiliente
// resilientAgent.ts
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
import pino from "pino";

const log = pino({ level: "info" });

interface ResilientConfig {
  maxRetries: number;
  initialDelayMs: number;
  maxDelayMs: number;
  timeoutMs: number;
  models: string[];
}

const DEFAULT_CONFIG: ResilientConfig = {
  maxRetries: 3,
  initialDelayMs: 2000,
  maxDelayMs: 120_000,
  timeoutMs: 300_000,
  models: ["claude-opus-4-5", "claude-sonnet-4-5", "claude-haiku-4-5"],
};

export async function runResilientAgent(
  prompt: string,
  cwd: string,
  config: Partial<ResilientConfig> = {}
): Promise<unknown[]> {
  const cfg = { ...DEFAULT_CONFIG, ...config };
  let lastError: Error | null = null;
  let modelIdx = 0;

  for (let attempt = 0; attempt <= cfg.maxRetries; attempt++) {
    const model = cfg.models[Math.min(modelIdx, cfg.models.length - 1)];
    const sessionLog = log.child({ attempt, model, cwd });

    try {
      sessionLog.info("attempt_start");
      const messages: unknown[] = [];
      const controller = new AbortController();
      const timeoutId = setTimeout(() => controller.abort(), cfg.timeoutMs);

      try {
        for await (const message of query({
          prompt,
          options: { cwd, model } as ClaudeCodeOptions,
        })) {
          if (controller.signal.aborted) throw new Error("Timeout");
          messages.push(message);
        }
      } finally {
        clearTimeout(timeoutId);
      }

      sessionLog.info("attempt_success");
      return messages;

    } catch (error) {
      lastError = error as Error;
      sessionLog.warn({ error: lastError.message }, "attempt_failed");

      const errMsg = lastError.message.toLowerCase();

      if (errMsg.includes("not installed") || errMsg.includes("401")) {
        throw lastError; // Fatal
      }

      if (errMsg.includes("500") || errMsg.includes("503")) {
        modelIdx++; // Probar modelo alternativo
      }

      if (attempt < cfg.maxRetries) {
        const delay = Math.min(cfg.initialDelayMs * Math.pow(2, attempt), cfg.maxDelayMs);
        sessionLog.info({ delay }, "retry_scheduled");
        await new Promise((r) => setTimeout(r, delay));
      }
    }
  }

  throw lastError ?? new Error("Agente falló sin error capturado");
}

Resumen del Capítulo

mindmap
  root((Resiliencia))
    Tipos de Error
      CLINotFoundError
        Fatal
      CLIConnectionError
        Reintentable
      APIError
        429 Rate Limit
        500 Servidor
        401 Fatal
      TimeoutError
        Cleanup
        Rollback
    Patrones
      Retry
        Exponential Backoff
        Jitter
        tenacity / p-retry
      Circuit Breaker
        CLOSED OPEN HALF-OPEN
        Protege servicio caído
      Timeout
        asyncio.wait_for
        AbortController
        Por herramienta
      Rollback
        Git checkpoint
        File backup
        Transaccional
    Escalación
      Human-in-the-Loop
      Logs estructurados
      Sentry observabilidad
    Degradación Graceful
      Modelo alternativo
      Cache fallback
      Modo reducido

Con estos patrones, tus agentes pueden enfrentar fallas de red, errores de la API, timeouts, y comportamientos inesperados del modelo, recuperándose de forma inteligente sin intervención humana en la mayoría de los casos. La resiliencia es la diferencia entre un prototipo y un sistema de producción.