Capítulo 13: Manejo de Errores y Resiliencia
Capítulo 13: Manejo de Errores y Resiliencia
Un agente que falla en producción y no se recupera es inútil. Este capítulo convierte agentes frágiles en sistemas robustos que enfrentan errores, reintentan inteligentemente, y cuando fallan, fallan de forma controlada.
1. Tipos de Errores en el SDK
Taxonomía completa de errores
El Claude Code SDK puede lanzar varios tipos de errores, cada uno con estrategias de recuperación distintas. Entender la naturaleza de cada error es el primer paso para manejarlos correctamente.
graph TD
A[Error en el SDK] --> B{¿Tipo?}
B -->|CLINotFoundError| C[Claude Code no instalado]
B -->|CLIConnectionError| D[Fallo al conectar]
B -->|ProcessError| E[Proceso terminó mal]
B -->|API Error| F{¿Código HTTP?}
B -->|TimeoutError| G[Tiempo agotado]
B -->|ToolError| H[Herramienta falló]
C --> C1[Fatal: instalar Claude Code]
D --> D1[Reintentable con backoff]
E --> E1{¿Exit code?}
E1 -->|1| E2[Error genérico - revisar logs]
E1 -->|2| E3[Uso incorrecto - fatal]
E1 -->|130| E4[SIGINT - cancelación limpia]
F -->|429| F1[Rate limit - backoff exponencial]
F -->|500/503| F2[Error servidor - reintentable]
F -->|400| F3[Bad request - fatal]
F -->|401| F4[Auth error - revisar API key]
G --> G1[Cancelar y reintentar con timeout mayor]
H --> H1[Agente puede auto-recuperarse]
style C1 fill:#ff6b6b,color:#fff
style D1 fill:#ffa94d,color:#fff
style F1 fill:#ffa94d,color:#fff
style F3 fill:#ff6b6b,color:#fff
style F4 fill:#ff6b6b,color:#fff
CLINotFoundError
Ocurre cuando el binario de claude no está instalado o no está en el PATH:
from claude_code_sdk import query, ClaudeCodeOptions
from claude_code_sdk.errors import CLINotFoundError
async def safe_query_with_install_check(prompt: str, cwd: str):
try:
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
yield message
except CLINotFoundError as e:
print(f"ERROR FATAL: Claude Code no está instalado.")
print(f"Instalar con: npm install -g @anthropic-ai/claude-code")
print(f"Detalle: {e}")
raise SystemExit(1) # No tiene sentido reintentar
def verify_cli_available() -> bool:
"""Verifica que el CLI esté disponible antes de intentar usar el SDK."""
import subprocess
try:
result = subprocess.run(
["claude", "--version"],
capture_output=True, text=True, timeout=5
)
return result.returncode == 0
except (FileNotFoundError, subprocess.TimeoutExpired):
return False
CLIConnectionError
Ocurre cuando el SDK no puede conectarse al proceso de Claude Code. Generalmente transitorio:
from claude_code_sdk.errors import CLIConnectionError
import asyncio
async def query_with_connection_retry(prompt: str, cwd: str, max_retries: int = 3):
for attempt in range(max_retries):
try:
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
yield message
return # Éxito
except CLIConnectionError as e:
if attempt == max_retries - 1:
raise # Último intento, propagar error
wait = 2 ** attempt # Backoff exponencial: 1s, 2s, 4s
print(f"Error de conexión (intento {attempt + 1}/{max_retries}). Reintentando en {wait}s...")
await asyncio.sleep(wait)
ProcessError
El proceso de Claude Code terminó con código de salida no cero:
from claude_code_sdk.errors import ProcessError
async def handle_process_errors(prompt: str, cwd: str):
"""Maneja ProcessError con diagnóstico detallado."""
try:
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
yield message
except ProcessError as e:
exit_code = getattr(e, "exit_code", -1)
stderr = getattr(e, "stderr", "")
if exit_code == 130:
# SIGINT - cancelación normal
print("Agente cancelado por señal del usuario")
return
if exit_code == 1:
# Error genérico del agente
print(f"El agente terminó con error. Stderr: {stderr}")
if "rate limit" in stderr.lower():
raise RateLimitError("Rate limit alcanzado") from e
raise
if exit_code == 2:
# Uso incorrecto - error fatal, no reintentable
print(f"Error de configuración: {stderr}")
raise
class RateLimitError(Exception):
"""Error específico de rate limit de Anthropic."""
pass
APIError y rate limiting
from claude_code_sdk.errors import APIError
import time
async def query_with_api_error_handling(prompt: str, cwd: str):
"""Maneja errores de API incluyendo rate limits."""
try:
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
yield message
except APIError as e:
status_code = getattr(e, "status_code", 0)
if status_code == 429:
# Rate limit - esperar y reintentar
retry_after = getattr(e, "retry_after", 60)
print(f"Rate limit alcanzado. Esperando {retry_after}s...")
await asyncio.sleep(retry_after)
# Reintentar recursivamente (con cuidado de infinitos loops)
async for message in query_with_api_error_handling(prompt, cwd):
yield message
elif status_code in (500, 502, 503, 504):
# Errores del servidor - reintentables
print(f"Error del servidor ({status_code}). El servicio puede estar degradado.")
raise
elif status_code == 401:
# Autenticación - fatal
print("ERROR: API key inválida o expirada")
raise SystemExit(1)
elif status_code == 400:
# Bad request - error en el código del cliente
print(f"Error en la petición: {e}")
raise
else:
raise
2. Retry con Backoff Exponencial
Implementación desde cero
import asyncio
import random
import time
from dataclasses import dataclass
from typing import Callable, Optional, TypeVar
from functools import wraps
T = TypeVar("T")
@dataclass
class RetryConfig:
max_retries: int = 3
initial_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
jitter: bool = True
retryable_exceptions: tuple = (Exception,)
def calculate_delay(attempt: int, config: RetryConfig) -> float:
"""Calcula el delay con backoff exponencial y jitter opcional."""
delay = min(
config.initial_delay * (config.exponential_base ** attempt),
config.max_delay
)
if config.jitter:
# Jitter ±20% para evitar el "thundering herd"
jitter_amount = delay * 0.2
delay += random.uniform(-jitter_amount, jitter_amount)
return max(0, delay)
async def retry_async(
func: Callable,
config: RetryConfig,
*args,
on_retry: Optional[Callable] = None,
**kwargs
):
"""Ejecuta una función async con retry automático."""
last_exception = None
for attempt in range(config.max_retries + 1):
try:
return await func(*args, **kwargs)
except config.retryable_exceptions as e:
last_exception = e
if attempt == config.max_retries:
print(f"Fallando después de {config.max_retries + 1} intentos")
raise
delay = calculate_delay(attempt, config)
if on_retry:
on_retry(attempt + 1, e, delay)
print(f"Intento {attempt + 1} falló: {e}. Reintentando en {delay:.1f}s...")
await asyncio.sleep(delay)
raise last_exception
# Versión para agentes específicamente
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
from claude_code_sdk.errors import CLIConnectionError, APIError
AGENT_RETRY_CONFIG = RetryConfig(
max_retries=3,
initial_delay=2.0,
max_delay=120.0,
exponential_base=2.0,
jitter=True,
retryable_exceptions=(CLIConnectionError, APIError, ConnectionError)
)
async def resilient_query(prompt: str, cwd: str) -> list:
"""Wrapper de query() con retry incorporado."""
messages = []
async def _run():
nonlocal messages
messages = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
messages.append(message)
return messages
return await retry_async(
_run,
AGENT_RETRY_CONFIG,
on_retry=lambda attempt, err, delay: print(
f"[Retry {attempt}] Error: {err} - esperando {delay:.1f}s"
)
)
Usando tenacity (librería Python)
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log,
after_log,
)
import logging
logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(4),
wait=wait_exponential(multiplier=1, min=2, max=60),
retry=retry_if_exception_type((CLIConnectionError, APIError)),
before_sleep=before_sleep_log(logger, logging.WARNING),
after=after_log(logger, logging.INFO),
reraise=True
)
async def query_with_tenacity(prompt: str, cwd: str):
"""Agente con retry automático usando tenacity."""
messages = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
messages.append(message)
return messages
TypeScript con p-retry
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
import pRetry, { AbortError } from "p-retry";
interface RetryOptions {
maxRetries?: number;
minTimeout?: number;
maxTimeout?: number;
}
async function resilientQuery(
prompt: string,
cwd: string,
options: RetryOptions = {}
): Promise<unknown[]> {
const { maxRetries = 3, minTimeout = 2000, maxTimeout = 60000 } = options;
return pRetry(
async (attemptNumber) => {
const messages: unknown[] = [];
try {
for await (const message of query({
prompt,
options: { cwd } as ClaudeCodeOptions,
})) {
messages.push(message);
}
return messages;
} catch (error) {
const err = error as Error;
// Errores que NO deben reintentarse
if (err.message.includes("not installed") || err.message.includes("401")) {
throw new AbortError(err.message);
}
console.warn(`Intento ${attemptNumber} falló: ${err.message}`);
throw error; // Reintentable
}
},
{
retries: maxRetries,
minTimeout,
maxTimeout,
factor: 2,
randomize: true,
onFailedAttempt: (error) => {
console.warn(
`Intento ${error.attemptNumber} de ${error.retriesLeft + error.attemptNumber} falló. ` +
`Próximo intento en ${error.retryDelay}ms`
);
},
}
);
}
3. Circuit Breaker Pattern
¿Por qué usar Circuit Breaker?
Si la API de Anthropic está caída, hacer retry infinito solo empeora la situación. El Circuit Breaker “abre el circuito” después de N fallos consecutivos, rechazando nuevas peticiones inmediatamente. Después de un tiempo, “cierra el circuito” para probar si el servicio se recuperó.
stateDiagram-v2
[*] --> CLOSED
CLOSED --> OPEN : failures >= threshold
OPEN --> HALF_OPEN : timeout elapsed
HALF_OPEN --> CLOSED : success
HALF_OPEN --> OPEN : failure
CLOSED : CLOSED\nPeticiones normales\nContando fallos
OPEN : OPEN\nRechaza todo\nEsperando timeout
HALF_OPEN : HALF_OPEN\nUna petición de prueba\nDecidiendo estado
Implementación en Python
import asyncio
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Callable, Any
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5
success_threshold: int = 2
timeout_seconds: float = 60.0
expected_exception: type = Exception
@dataclass
class CircuitBreakerStats:
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
rejected_requests: int = 0
state_changes: list = field(default_factory=list)
class CircuitBreaker:
def __init__(self, name: str, config: CircuitBreakerConfig):
self.name = name
self.config = config
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time: Optional[float] = None
self.stats = CircuitBreakerStats()
@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
elapsed = time.time() - (self._last_failure_time or 0)
if elapsed >= self.config.timeout_seconds:
self._transition_to(CircuitState.HALF_OPEN)
return self._state
def _transition_to(self, new_state: CircuitState):
old_state = self._state
self._state = new_state
self.stats.state_changes.append({
"from": old_state.value,
"to": new_state.value,
"time": time.time()
})
print(f"[CircuitBreaker:{self.name}] {old_state.value} → {new_state.value}")
def _on_success(self):
self.stats.successful_requests += 1
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.config.success_threshold:
self._failure_count = 0
self._success_count = 0
self._transition_to(CircuitState.CLOSED)
elif self._state == CircuitState.CLOSED:
self._failure_count = 0
def _on_failure(self, exc: Exception):
self.stats.failed_requests += 1
self._failure_count += 1
self._last_failure_time = time.time()
if self._state == CircuitState.HALF_OPEN:
self._success_count = 0
self._transition_to(CircuitState.OPEN)
elif self._state == CircuitState.CLOSED:
if self._failure_count >= self.config.failure_threshold:
self._transition_to(CircuitState.OPEN)
async def call(self, func: Callable, *args, **kwargs) -> Any:
self.stats.total_requests += 1
if self.state == CircuitState.OPEN:
self.stats.rejected_requests += 1
raise CircuitOpenError(
f"CircuitBreaker '{self.name}' está abierto. "
f"Servicio considerado no disponible."
)
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except self.config.expected_exception as e:
self._on_failure(e)
raise
def get_metrics(self) -> dict:
success_rate = (
self.stats.successful_requests / self.stats.total_requests
if self.stats.total_requests > 0 else 0
)
return {
"state": self.state.value,
"failure_count": self._failure_count,
"total_requests": self.stats.total_requests,
"success_rate": f"{success_rate:.1%}",
"rejected_requests": self.stats.rejected_requests,
}
class CircuitOpenError(Exception):
"""El circuit breaker está abierto."""
pass
# Instancia global del circuit breaker para el SDK
from claude_code_sdk.errors import CLIConnectionError, APIError
anthropic_circuit_breaker = CircuitBreaker(
name="anthropic_api",
config=CircuitBreakerConfig(
failure_threshold=5,
success_threshold=2,
timeout_seconds=120.0,
expected_exception=(CLIConnectionError, APIError, ConnectionError)
)
)
async def protected_query(prompt: str, cwd: str):
"""Query protegido por circuit breaker."""
async def _run():
messages = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
messages.append(message)
return messages
try:
return await anthropic_circuit_breaker.call(_run)
except CircuitOpenError as e:
print(f"Servicio no disponible: {e}")
print(f"Métricas: {anthropic_circuit_breaker.get_metrics()}")
raise
TypeScript: Circuit Breaker
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
type CircuitState = "CLOSED" | "OPEN" | "HALF_OPEN";
interface CircuitBreakerOptions {
failureThreshold: number;
successThreshold: number;
timeoutMs: number;
}
class CircuitBreaker {
private state: CircuitState = "CLOSED";
private failureCount = 0;
private successCount = 0;
private lastFailureTime = 0;
constructor(
private readonly name: string,
private readonly options: CircuitBreakerOptions
) {}
private get currentState(): CircuitState {
if (this.state === "OPEN") {
const elapsed = Date.now() - this.lastFailureTime;
if (elapsed >= this.options.timeoutMs) {
this.state = "HALF_OPEN";
console.log(`[CB:${this.name}] OPEN → HALF_OPEN`);
}
}
return this.state;
}
async execute<T>(fn: () => Promise<T>): Promise<T> {
if (this.currentState === "OPEN") {
throw new Error(`Circuit breaker '${this.name}' está abierto`);
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
private onSuccess(): void {
if (this.state === "HALF_OPEN") {
this.successCount++;
if (this.successCount >= this.options.successThreshold) {
this.state = "CLOSED";
this.failureCount = 0;
this.successCount = 0;
console.log(`[CB:${this.name}] HALF_OPEN → CLOSED`);
}
} else {
this.failureCount = 0;
}
}
private onFailure(): void {
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.state === "HALF_OPEN" || this.failureCount >= this.options.failureThreshold) {
this.state = "OPEN";
console.log(`[CB:${this.name}] → OPEN (failures: ${this.failureCount})`);
}
}
getState(): CircuitState {
return this.currentState;
}
}
const anthropicCB = new CircuitBreaker("anthropic", {
failureThreshold: 5,
successThreshold: 2,
timeoutMs: 120_000,
});
async function protectedQuery(prompt: string, cwd: string) {
return anthropicCB.execute(async () => {
const messages: unknown[] = [];
for await (const message of query({
prompt,
options: { cwd } as ClaudeCodeOptions,
})) {
messages.push(message);
}
return messages;
});
}
4. Timeout Handling
asyncio.wait_for en Python
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
async def query_with_timeout(
prompt: str,
cwd: str,
timeout_seconds: float = 300.0 # 5 minutos por defecto
):
"""Query con timeout total y cleanup automático."""
async def _run_agent():
messages = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
messages.append(message)
return messages
try:
return await asyncio.wait_for(_run_agent(), timeout=timeout_seconds)
except asyncio.TimeoutError:
print(f"Timeout: el agente tardó más de {timeout_seconds}s")
# Cleanup: deshacer cambios parciales si es necesario
await cleanup_partial_changes(cwd)
raise
async def query_with_per_tool_timeout(
prompt: str,
cwd: str,
total_timeout: float = 300.0,
tool_timeout: float = 30.0
):
"""Timeout por herramienta individual."""
messages = []
last_message_time = asyncio.get_event_loop().time()
async def _generator():
nonlocal last_message_time
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
last_message_time = asyncio.get_event_loop().time()
yield message
async def _monitor():
"""Cancela si no hay actividad por tool_timeout segundos."""
while True:
await asyncio.sleep(5)
elapsed = asyncio.get_event_loop().time() - last_message_time
if elapsed > tool_timeout:
raise asyncio.TimeoutError(
f"Sin actividad por {tool_timeout}s - posible herramienta colgada"
)
try:
async with asyncio.timeout(total_timeout):
monitor_task = asyncio.create_task(_monitor())
async for message in _generator():
messages.append(message)
monitor_task.cancel()
except asyncio.TimeoutError as e:
print(f"Timeout: {e}")
raise
return messages
async def cleanup_partial_changes(cwd: str):
"""Revierte cambios parciales si el agente fue interrumpido."""
import subprocess
from pathlib import Path
git_dir = Path(cwd) / ".git"
if git_dir.exists():
try:
# Guardar cambios parciales en stash
result = subprocess.run(
["git", "stash", "push", "-m", "agent-partial-changes-timeout"],
cwd=cwd,
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
print(f"Cambios parciales guardados en git stash")
except subprocess.TimeoutExpired:
print("No se pudo hacer git stash")
AbortController en TypeScript
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
async function queryWithTimeout(
prompt: string,
cwd: string,
timeoutMs: number = 300_000
): Promise<unknown[]> {
const controller = new AbortController();
const messages: unknown[] = [];
// Auto-cancelar por timeout
const timeoutId = setTimeout(() => {
controller.abort(new Error(`Timeout después de ${timeoutMs}ms`));
}, timeoutMs);
try {
for await (const message of query({
prompt,
options: { cwd } as ClaudeCodeOptions,
})) {
if (controller.signal.aborted) {
throw new Error("Agente cancelado por timeout");
}
messages.push(message);
}
return messages;
} catch (error) {
if (controller.signal.aborted) {
console.error(`Timeout en agente: ${timeoutMs}ms`);
await cleanupPartialChanges(cwd);
}
throw error;
} finally {
clearTimeout(timeoutId);
}
}
async function cleanupPartialChanges(cwd: string): Promise<void> {
const { execSync } = await import("child_process");
try {
execSync("git stash push -m agent-timeout-cleanup", {
cwd,
stdio: "pipe",
timeout: 10_000,
});
console.log("Cambios parciales guardados en stash");
} catch {
console.warn("No se pudo hacer git stash");
}
}
5. Manejo de Errores en Herramientas
Qué pasa cuando Bash devuelve exit_code != 0
El agente de Claude puede recuperarse de muchos errores de herramientas por sí solo, pero necesitas ayuda para detectar cuándo no lo hace:
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
async def monitor_tool_errors(prompt: str, cwd: str):
"""Monitorea errores de herramientas durante la ejecución."""
tool_errors = []
consecutive_errors = 0
MAX_CONSECUTIVE_ERRORS = 3
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
if isinstance(message, AssistantMessage):
for block in message.content:
if not hasattr(block, "name"):
continue
tool_name = block.name
tool_input = getattr(block, "input", {})
# Detectar patrones de error en el input
if tool_name == "Bash":
cmd = tool_input.get("command", "")
if is_dangerous_command(cmd):
print(f"⚠️ Comando potencialmente peligroso: {cmd}")
elif isinstance(message, ResultMessage):
if message.subtype == "error":
print(f"El agente reportó error al completar la tarea")
yield message
def is_dangerous_command(cmd: str) -> bool:
"""Detecta comandos que podrían ser problemáticos."""
dangerous_patterns = [
"rm -rf /",
"dd if=/dev/zero",
"mkfs",
"> /dev/sda",
":(){ :|:& };:" # Fork bomb
]
cmd_lower = cmd.lower()
return any(p in cmd_lower for p in dangerous_patterns)
PostToolUse hook para detectar errores
from claude_code_sdk import ClaudeCodeOptions
from typing import Optional
def create_error_detecting_hook(on_error: Optional[callable] = None):
"""Hook que detecta errores en herramientas y notifica."""
error_keywords = [
"error", "failed", "exception", "traceback",
"no such file", "permission denied", "command not found",
"syntax error", "import error"
]
def post_tool_use(tool_name: str, tool_input: dict, tool_output: str) -> None:
output_lower = tool_output.lower()
is_error = any(kw in output_lower for kw in error_keywords)
if is_error:
error_info = {
"tool": tool_name,
"input": tool_input,
"output_preview": tool_output[:200],
}
if on_error:
on_error(error_info)
else:
print(f"⚠️ Error detectado en {tool_name}: {tool_output[:100]}")
return post_tool_use
# Uso con el SDK
def build_options_with_error_detection(cwd: str) -> ClaudeCodeOptions:
errors_log = []
def log_error(error_info: dict):
errors_log.append(error_info)
print(f"Tool error logged: {error_info['tool']}")
return ClaudeCodeOptions(
cwd=cwd,
post_tool_use_hook=create_error_detecting_hook(on_error=log_error)
)
6. Rollback y Compensación
Backup automático antes de editar
import shutil
from pathlib import Path
from datetime import datetime
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage
class AgentWithRollback:
"""Agente que hace backup automático y puede hacer rollback."""
def __init__(self, project_dir: str):
self.project_dir = Path(project_dir)
self.backup_dir: Optional[Path] = None
def create_backup(self) -> Path:
"""Crea backup del directorio del proyecto."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = Path(f"/tmp/agent_backup_{timestamp}")
shutil.copytree(self.project_dir, backup_path)
self.backup_dir = backup_path
print(f"Backup creado en: {backup_path}")
return backup_path
def rollback(self):
"""Restaura el proyecto desde el backup."""
if not self.backup_dir or not self.backup_dir.exists():
raise ValueError("No hay backup disponible para rollback")
# Borrar estado actual
shutil.rmtree(self.project_dir)
# Restaurar backup
shutil.copytree(self.backup_dir, self.project_dir)
print(f"Rollback completado desde: {self.backup_dir}")
def cleanup_backup(self):
"""Elimina el backup (llamar cuando el agente termina exitosamente)."""
if self.backup_dir and self.backup_dir.exists():
shutil.rmtree(self.backup_dir)
print(f"Backup limpiado: {self.backup_dir}")
self.backup_dir = None
async def run_with_rollback(self, prompt: str) -> list:
"""Ejecuta el agente con rollback automático en caso de error."""
self.create_backup()
messages = []
success = False
try:
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=str(self.project_dir))
):
messages.append(message)
# Verificar que el proyecto sigue siendo válido
if not self._validate_project():
raise ValueError("El proyecto quedó en estado inválido")
success = True
return messages
except Exception as e:
print(f"Error durante ejecución del agente: {e}")
print("Iniciando rollback...")
self.rollback()
raise
finally:
if success:
self.cleanup_backup()
def _validate_project(self) -> bool:
"""Verifica que el proyecto esté en estado válido."""
# Ejemplo: verificar que los tests siguen pasando
import subprocess
result = subprocess.run(
["python", "-m", "pytest", "--tb=no", "-q"],
cwd=self.project_dir,
capture_output=True,
timeout=60
)
return result.returncode == 0
Git rollback transaccional
import subprocess
from pathlib import Path
class GitTransactionalAgent:
"""Agente con transaccionalidad usando git."""
def __init__(self, project_dir: str):
self.cwd = project_dir
self._stash_ref: Optional[str] = None
def _git(self, *args, check=True) -> str:
result = subprocess.run(
["git", *args],
cwd=self.cwd,
capture_output=True,
text=True,
timeout=30
)
if check and result.returncode != 0:
raise subprocess.CalledProcessError(
result.returncode, ["git", *args], result.stderr
)
return result.stdout.strip()
def begin_transaction(self) -> str:
"""Crea un commit de checkpoint para poder volver."""
# Guardar estado actual
self._git("add", "-A")
try:
self._git("commit", "-m", "agent-transaction-checkpoint [skip ci]")
checkpoint = self._git("rev-parse", "HEAD")
print(f"Checkpoint creado: {checkpoint[:8]}")
return checkpoint
except subprocess.CalledProcessError:
# No hay cambios que commitear
checkpoint = self._git("rev-parse", "HEAD")
return checkpoint
def rollback_to_checkpoint(self, checkpoint: str):
"""Revierte al checkpoint especificado."""
# Deshacer commits posteriores al checkpoint
self._git("reset", "--hard", checkpoint)
print(f"Rollback a checkpoint: {checkpoint[:8]}")
def commit_transaction(self, message: str = "agent: applied changes"):
"""Squash commits del agente en uno solo."""
self._git("add", "-A")
try:
self._git("commit", "-m", message)
except subprocess.CalledProcessError:
pass # Nada que commitear
async def run_transactional(self, prompt: str, commit_message: str) -> list:
from claude_code_sdk import query, ClaudeCodeOptions
checkpoint = self.begin_transaction()
messages = []
try:
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=self.cwd)
):
messages.append(message)
self.commit_transaction(commit_message)
return messages
except Exception as e:
print(f"Error: {e}. Haciendo rollback...")
self.rollback_to_checkpoint(checkpoint)
raise
7. Human-in-the-Loop para Errores
Cuándo escalar al humano
flowchart TD
A[Error en agente] --> B{¿Es recuperable?}
B -->|Sí, reintentable| C[Retry automático]
B -->|No sé| D{¿Intentos agotados?}
B -->|No, fatal| E[Escalar a humano]
C --> F{¿Éxito?}
F -->|Sí| G[Continuar]
F -->|No, max retries| D
D -->|Sí| E
D -->|No| C
E --> H[AskUserQuestion]
H --> I{¿Respuesta humana?}
I -->|Continuar diferente| J[Reintentar con feedback]
I -->|Abandonar| K[Terminar gracefully]
I -->|Timeout| L[Fallback automático]
style E fill:#ffa94d,color:#fff
style K fill:#ff6b6b,color:#fff
Patrón de escalación
import asyncio
from typing import Optional, Callable
class HumanInTheLoopHandler:
"""Maneja la escalación de errores al humano."""
def __init__(
self,
question_timeout_seconds: float = 120.0,
ask_question: Optional[Callable] = None
):
self.timeout = question_timeout_seconds
self._ask_question = ask_question or self._default_ask
async def _default_ask(self, question: str) -> str:
"""Pregunta al usuario en la terminal."""
print(f"\n{'='*50}")
print(f"⚠️ El agente necesita tu ayuda:")
print(question)
print(f"{'='*50}")
print("Responde (o presiona Enter para continuar con defaults):")
loop = asyncio.get_event_loop()
try:
return await asyncio.wait_for(
loop.run_in_executor(None, input, "> "),
timeout=self.timeout
)
except asyncio.TimeoutError:
print(f"\n[Timeout después de {self.timeout}s, usando respuesta por defecto]")
return ""
async def handle_error(
self,
error: Exception,
context: dict,
retry_callback: Callable
):
"""Escala un error al humano y decide cómo continuar."""
question = self._format_error_question(error, context)
response = await self._ask_question(question)
if not response or response.lower() in ("", "continuar", "y", "yes"):
# Reintentar con configuración modificada si el usuario dio instrucciones
return await retry_callback(user_guidance=response)
elif response.lower() in ("abandonar", "n", "no", "abort"):
raise SystemExit("Tarea abandonada por el usuario")
else:
# El usuario dio instrucciones específicas
return await retry_callback(user_guidance=response)
def _format_error_question(self, error: Exception, context: dict) -> str:
return f"""
Error encontrado: {type(error).__name__}: {error}
Contexto:
- Tarea: {context.get('prompt', 'N/A')}
- Directorio: {context.get('cwd', 'N/A')}
- Intentos realizados: {context.get('attempts', 0)}
¿Qué quieres hacer?
- Presiona Enter para reintentar
- Escribe instrucciones adicionales para el agente
- Escribe 'abandonar' para cancelar
"""
async def agent_with_hitl(prompt: str, cwd: str):
"""Agente que escala errores al humano."""
from claude_code_sdk import query, ClaudeCodeOptions
from claude_code_sdk.errors import CLIConnectionError
hitl = HumanInTheLoopHandler(question_timeout_seconds=60.0)
attempts = 0
max_human_escalations = 2
async def run_agent(user_guidance: str = ""):
nonlocal attempts
attempts += 1
full_prompt = prompt
if user_guidance:
full_prompt = f"{prompt}\n\nNota adicional del usuario: {user_guidance}"
messages = []
async for message in query(
prompt=full_prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
messages.append(message)
return messages
for escalation in range(max_human_escalations + 1):
try:
return await run_agent()
except (CLIConnectionError, TimeoutError) as e:
if escalation >= max_human_escalations:
print("Máximo de escalaciones alcanzado. Abortando.")
raise
await hitl.handle_error(
error=e,
context={"prompt": prompt, "cwd": cwd, "attempts": attempts},
retry_callback=run_agent
)
8. Logging Estructurado
Python: structlog
# logging_config.py
import structlog
import logging
from datetime import datetime
def setup_logging(log_level: str = "INFO", log_file: str = None):
"""Configura logging estructurado para el agente."""
processors = [
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
]
structlog.configure(
processors=processors,
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
# Configurar stdlib logging
logging.basicConfig(
format="%(message)s",
level=getattr(logging, log_level.upper()),
handlers=[
logging.StreamHandler(),
*([] if not log_file else [logging.FileHandler(log_file)])
]
)
# agent_logger.py
import structlog
import uuid
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
logger = structlog.get_logger(__name__)
async def logged_agent_run(prompt: str, cwd: str):
"""Agente con logging estructurado completo."""
session_id = str(uuid.uuid4())[:8]
log = logger.bind(
session_id=session_id,
cwd=cwd,
prompt_length=len(prompt)
)
log.info("agent_start", prompt_preview=prompt[:100])
tool_events = []
start_time = __import__("time").time()
try:
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "name"):
tool_event = {
"tool": block.name,
"input_keys": list(getattr(block, "input", {}).keys())
}
tool_events.append(tool_event)
log.info("tool_use", **tool_event)
elif isinstance(message, ResultMessage):
duration = __import__("time").time() - start_time
log.info(
"agent_complete",
subtype=message.subtype,
duration_ms=message.duration_ms,
cost_usd=message.cost_usd,
num_tools=len(tool_events),
actual_duration_s=f"{duration:.2f}"
)
except Exception as e:
duration = __import__("time").time() - start_time
log.error(
"agent_error",
error_type=type(e).__name__,
error_message=str(e),
duration_s=f"{duration:.2f}",
num_tools_before_error=len(tool_events),
exc_info=True
)
raise
TypeScript: pino
import { query, ClaudeCodeOptions, AssistantMessage, ResultMessage } from "@anthropic-ai/claude-code-sdk";
import pino from "pino";
import { randomUUID } from "crypto";
const logger = pino({
level: process.env.LOG_LEVEL ?? "info",
transport: {
target: "pino-pretty",
options: {
colorize: true,
translateTime: "SYS:standard",
},
},
});
async function loggedAgentRun(prompt: string, cwd: string): Promise<unknown[]> {
const sessionId = randomUUID().slice(0, 8);
const log = logger.child({ sessionId, cwd });
const messages: unknown[] = [];
const toolEvents: { tool: string; timestamp: number }[] = [];
const startTime = Date.now();
log.info({ promptLength: prompt.length, promptPreview: prompt.slice(0, 100) }, "agent_start");
try {
for await (const message of query({
prompt,
options: { cwd } as ClaudeCodeOptions,
})) {
messages.push(message);
if (message.type === "assistant") {
const assistantMsg = message as AssistantMessage;
for (const block of assistantMsg.message.content) {
if (block.type === "tool_use") {
toolEvents.push({ tool: block.name, timestamp: Date.now() });
log.info({ tool: block.name, inputKeys: Object.keys(block.input) }, "tool_use");
}
}
} else if (message.type === "result") {
const resultMsg = message as ResultMessage;
log.info(
{
subtype: resultMsg.subtype,
durationMs: resultMsg.duration_ms,
costUsd: resultMsg.total_cost_usd,
numTools: toolEvents.length,
wallTimeMs: Date.now() - startTime,
},
"agent_complete"
);
}
}
return messages;
} catch (error) {
log.error(
{
errorType: (error as Error).constructor.name,
errorMessage: (error as Error).message,
wallTimeMs: Date.now() - startTime,
numToolsBeforeError: toolEvents.length,
},
"agent_error"
);
throw error;
}
}
9. Observabilidad de Errores
Sentry integration
import sentry_sdk
from sentry_sdk.integrations.asyncio import AsyncioIntegration
from claude_code_sdk import query, ClaudeCodeOptions, ResultMessage
from claude_code_sdk.errors import CLIConnectionError, APIError
def init_sentry(dsn: str, environment: str = "production"):
"""Inicializa Sentry para captura de errores del agente."""
sentry_sdk.init(
dsn=dsn,
environment=environment,
integrations=[AsyncioIntegration()],
traces_sample_rate=0.1,
profiles_sample_rate=0.1,
)
async def monitored_agent_run(prompt: str, cwd: str, user_id: str = None):
"""Agente con monitoreo completo de errores."""
with sentry_sdk.new_scope() as scope:
scope.set_tag("agent.type", "claude-code")
scope.set_tag("environment", "production")
if user_id:
scope.set_user({"id": user_id})
scope.set_context("agent", {
"cwd": cwd,
"prompt_length": len(prompt),
})
with sentry_sdk.start_transaction(
op="agent.run",
name=f"Claude Agent: {prompt[:50]}"
) as transaction:
try:
messages = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
messages.append(message)
if isinstance(message, ResultMessage):
transaction.set_measurement("cost_usd", message.cost_usd or 0)
transaction.set_measurement("duration_ms", message.duration_ms)
transaction.set_status("ok")
return messages
except CLIConnectionError as e:
sentry_sdk.capture_exception(e)
transaction.set_status("internal_error")
raise
except APIError as e:
# Capturar con contexto adicional
sentry_sdk.capture_exception(e, scope=scope)
transaction.set_status("unavailable")
raise
except Exception as e:
sentry_sdk.capture_exception(e)
transaction.set_status("unknown_error")
raise
def track_error_rate(error: Exception, metric_name: str = "agent.error_rate"):
"""Registra errores en sistema de métricas."""
error_type = type(error).__name__
# Aquí conectarías con tu sistema de métricas (Datadog, Prometheus, etc.)
print(f"METRIC: {metric_name}{{error_type={error_type}}} += 1")
10. Graceful Degradation
Fallback a modelo más barato
from claude_code_sdk import query, ClaudeCodeOptions
from claude_code_sdk.errors import APIError
from typing import Optional
MODELS = [
"claude-opus-4-5", # Mejor, más caro
"claude-sonnet-4-5", # Balanceado
"claude-haiku-4-5", # Más barato, más rápido
]
async def query_with_model_fallback(
prompt: str,
cwd: str,
preferred_model: str = "claude-opus-4-5"
):
"""Intenta con el modelo preferido, hace fallback si falla."""
start_idx = MODELS.index(preferred_model) if preferred_model in MODELS else 0
models_to_try = MODELS[start_idx:]
last_error = None
for model in models_to_try:
try:
print(f"Intentando con modelo: {model}")
messages = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(
cwd=cwd,
model=model
)
):
messages.append(message)
if model != preferred_model:
print(f"⚠️ Usando modelo de fallback: {model}")
return messages, model
except APIError as e:
status = getattr(e, "status_code", 0)
if status in (429, 503):
# Rate limit o servicio degradado - probar siguiente modelo
print(f"Modelo {model} no disponible ({status}), probando siguiente...")
last_error = e
continue
else:
raise # Otros errores no son de modelo, propagar
raise last_error or RuntimeError("Todos los modelos fallaron")
async def query_with_cache_fallback(
prompt: str,
cwd: str,
cache_key: str,
cache: dict
):
"""Usa respuesta cacheada si la API falla."""
try:
messages = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd)
):
messages.append(message)
# Guardar en cache si fue exitoso
cache[cache_key] = messages
return messages
except Exception as e:
# Verificar si hay respuesta cacheada
if cache_key in cache:
print(f"⚠️ API falló ({e}), usando respuesta cacheada")
return cache[cache_key]
raise
11. Ejemplo Completo: Agente Ultra-Resiliente
Este ejemplo combina todos los patrones vistos en el capítulo:
# resilient_agent.py
import asyncio
import time
import uuid
import structlog
from dataclasses import dataclass, field
from typing import Optional, Callable
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
from claude_code_sdk.errors import CLINotFoundError, CLIConnectionError, APIError
logger = structlog.get_logger(__name__)
@dataclass
class ResilientAgentConfig:
# Retry
max_retries: int = 3
initial_delay: float = 2.0
max_delay: float = 120.0
# Timeout
total_timeout_seconds: float = 600.0
tool_inactivity_timeout: float = 60.0
# Circuit Breaker
circuit_failure_threshold: int = 5
circuit_timeout_seconds: float = 120.0
# Rollback
enable_git_rollback: bool = True
# HITL
enable_human_escalation: bool = True
human_response_timeout: float = 120.0
# Modelos de fallback
models: list = field(default_factory=lambda: [
"claude-opus-4-5",
"claude-sonnet-4-5",
"claude-haiku-4-5"
])
class ResilientAgent:
def __init__(self, config: ResilientAgentConfig = None):
self.config = config or ResilientAgentConfig()
self._circuit_state = "CLOSED"
self._circuit_failures = 0
self._circuit_last_failure: Optional[float] = None
def _check_circuit(self):
if self._circuit_state == "OPEN":
elapsed = time.time() - (self._circuit_last_failure or 0)
if elapsed >= self.config.circuit_timeout_seconds:
self._circuit_state = "HALF_OPEN"
logger.info("circuit_half_open")
else:
remaining = self.config.circuit_timeout_seconds - elapsed
raise RuntimeError(
f"Circuit breaker abierto. Reintenta en {remaining:.0f}s"
)
def _on_circuit_success(self):
if self._circuit_state == "HALF_OPEN":
self._circuit_state = "CLOSED"
self._circuit_failures = 0
logger.info("circuit_closed")
def _on_circuit_failure(self, error: Exception):
self._circuit_failures += 1
self._circuit_last_failure = time.time()
if self._circuit_failures >= self.config.circuit_failure_threshold:
if self._circuit_state != "OPEN":
self._circuit_state = "OPEN"
logger.warning("circuit_opened", failures=self._circuit_failures)
async def _git_checkpoint(self, cwd: str) -> Optional[str]:
if not self.config.enable_git_rollback:
return None
import subprocess
try:
result = subprocess.run(
["git", "rev-parse", "HEAD"],
cwd=cwd, capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
checkpoint = result.stdout.strip()
logger.info("git_checkpoint_created", commit=checkpoint[:8])
return checkpoint
except Exception as e:
logger.warning("git_checkpoint_failed", error=str(e))
return None
async def _git_rollback(self, cwd: str, checkpoint: str):
if not checkpoint:
return
import subprocess
try:
subprocess.run(
["git", "reset", "--hard", checkpoint],
cwd=cwd, capture_output=True, text=True, timeout=30, check=True
)
logger.info("git_rollback_complete", checkpoint=checkpoint[:8])
except Exception as e:
logger.error("git_rollback_failed", error=str(e))
async def _ask_human(self, question: str) -> str:
if not self.config.enable_human_escalation:
return ""
try:
loop = asyncio.get_event_loop()
print(f"\n🤔 Agente necesita ayuda:\n{question}\n> ", end="", flush=True)
response = await asyncio.wait_for(
loop.run_in_executor(None, input, ""),
timeout=self.config.human_response_timeout
)
return response.strip()
except asyncio.TimeoutError:
logger.info("human_escalation_timeout")
return ""
async def run(
self,
prompt: str,
cwd: str,
on_message: Optional[Callable] = None
) -> list:
session_id = str(uuid.uuid4())[:8]
log = logger.bind(session_id=session_id, cwd=cwd)
log.info("resilient_agent_start")
# 1. Verificar CLI disponible
import subprocess
try:
subprocess.run(["claude", "--version"], capture_output=True, timeout=5, check=True)
except (FileNotFoundError, subprocess.CalledProcessError):
log.error("cli_not_found")
raise CLINotFoundError("Claude Code CLI no está instalado")
# 2. Crear checkpoint git
checkpoint = await self._git_checkpoint(cwd)
# 3. Intentar con retry + circuit breaker + timeout
last_error = None
model_idx = 0
for attempt in range(self.config.max_retries + 1):
try:
self._check_circuit()
model = self.config.models[min(model_idx, len(self.config.models) - 1)]
if model_idx > 0:
log.warning("using_fallback_model", model=model, attempt=attempt)
messages = []
async def _run_with_timeout():
async with asyncio.timeout(self.config.total_timeout_seconds):
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(cwd=cwd, model=model)
):
messages.append(message)
if on_message:
await on_message(message)
await _run_with_timeout()
self._on_circuit_success()
log.info("resilient_agent_success", attempts=attempt + 1, model=model)
return messages
except CLINotFoundError:
log.error("cli_not_found_during_run")
raise # Fatal, no reintentable
except asyncio.TimeoutError as e:
last_error = e
log.warning("timeout", attempt=attempt, timeout=self.config.total_timeout_seconds)
self._on_circuit_failure(e)
if checkpoint:
await self._git_rollback(cwd, checkpoint)
except APIError as e:
last_error = e
status = getattr(e, "status_code", 0)
self._on_circuit_failure(e)
if status == 429:
retry_after = getattr(e, "retry_after", 60)
log.warning("rate_limit", retry_after=retry_after)
await asyncio.sleep(retry_after)
continue
elif status in (500, 503):
model_idx += 1 # Probar modelo alternativo
elif status in (400, 401):
log.error("api_fatal_error", status=status)
raise
except (CLIConnectionError, ConnectionError) as e:
last_error = e
self._on_circuit_failure(e)
log.warning("connection_error", error=str(e), attempt=attempt)
# Delay con backoff exponencial
if attempt < self.config.max_retries:
delay = min(
self.config.initial_delay * (2 ** attempt),
self.config.max_delay
)
log.info("retry_delay", delay=delay, next_attempt=attempt + 2)
await asyncio.sleep(delay)
# Todos los intentos agotados - escalar al humano
if self.config.enable_human_escalation and last_error:
response = await self._ask_human(
f"El agente falló después de {self.config.max_retries + 1} intentos.\n"
f"Último error: {last_error}\n\n"
f"¿Quieres intentar de nuevo con instrucciones adicionales? (o Enter para abandonar)"
)
if response:
return await self.run(
prompt=f"{prompt}\n\nInstrucciones adicionales: {response}",
cwd=cwd,
on_message=on_message
)
log.error("resilient_agent_failed", max_retries=self.config.max_retries)
raise last_error or RuntimeError("Agente falló sin error capturado")
# TypeScript: versión simplificada del agente resiliente
// resilientAgent.ts
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
import pino from "pino";
const log = pino({ level: "info" });
interface ResilientConfig {
maxRetries: number;
initialDelayMs: number;
maxDelayMs: number;
timeoutMs: number;
models: string[];
}
const DEFAULT_CONFIG: ResilientConfig = {
maxRetries: 3,
initialDelayMs: 2000,
maxDelayMs: 120_000,
timeoutMs: 300_000,
models: ["claude-opus-4-5", "claude-sonnet-4-5", "claude-haiku-4-5"],
};
export async function runResilientAgent(
prompt: string,
cwd: string,
config: Partial<ResilientConfig> = {}
): Promise<unknown[]> {
const cfg = { ...DEFAULT_CONFIG, ...config };
let lastError: Error | null = null;
let modelIdx = 0;
for (let attempt = 0; attempt <= cfg.maxRetries; attempt++) {
const model = cfg.models[Math.min(modelIdx, cfg.models.length - 1)];
const sessionLog = log.child({ attempt, model, cwd });
try {
sessionLog.info("attempt_start");
const messages: unknown[] = [];
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), cfg.timeoutMs);
try {
for await (const message of query({
prompt,
options: { cwd, model } as ClaudeCodeOptions,
})) {
if (controller.signal.aborted) throw new Error("Timeout");
messages.push(message);
}
} finally {
clearTimeout(timeoutId);
}
sessionLog.info("attempt_success");
return messages;
} catch (error) {
lastError = error as Error;
sessionLog.warn({ error: lastError.message }, "attempt_failed");
const errMsg = lastError.message.toLowerCase();
if (errMsg.includes("not installed") || errMsg.includes("401")) {
throw lastError; // Fatal
}
if (errMsg.includes("500") || errMsg.includes("503")) {
modelIdx++; // Probar modelo alternativo
}
if (attempt < cfg.maxRetries) {
const delay = Math.min(cfg.initialDelayMs * Math.pow(2, attempt), cfg.maxDelayMs);
sessionLog.info({ delay }, "retry_scheduled");
await new Promise((r) => setTimeout(r, delay));
}
}
}
throw lastError ?? new Error("Agente falló sin error capturado");
}
Resumen del Capítulo
mindmap
root((Resiliencia))
Tipos de Error
CLINotFoundError
Fatal
CLIConnectionError
Reintentable
APIError
429 Rate Limit
500 Servidor
401 Fatal
TimeoutError
Cleanup
Rollback
Patrones
Retry
Exponential Backoff
Jitter
tenacity / p-retry
Circuit Breaker
CLOSED OPEN HALF-OPEN
Protege servicio caído
Timeout
asyncio.wait_for
AbortController
Por herramienta
Rollback
Git checkpoint
File backup
Transaccional
Escalación
Human-in-the-Loop
Logs estructurados
Sentry observabilidad
Degradación Graceful
Modelo alternativo
Cache fallback
Modo reducido
Con estos patrones, tus agentes pueden enfrentar fallas de red, errores de la API, timeouts, y comportamientos inesperados del modelo, recuperándose de forma inteligente sin intervención humana en la mayoría de los casos. La resiliencia es la diferencia entre un prototipo y un sistema de producción.