Capítulo 20: SDK Internals y Extensiones

Por: Artiko
claudeagent-sdkinternalsextensioncontribucion

Capítulo 20: SDK Internals y Extensiones

Entender cómo funciona el SDK por dentro no solo satisface la curiosidad técnica — permite debuggear problemas difíciles, crear extensiones robustas y aprovechar características no documentadas. Este capítulo abre la caja negra del Claude Code SDK.


1. Cómo Funciona el SDK Internamente

El SDK Spawna claude -p como Subproceso

La arquitectura más sorprendente del SDK es que no llama directamente a la API de Anthropic. En cambio, spawna el binario claude (Claude Code CLI) como un subproceso hijo y se comunica con él mediante JSON sobre stdio.

flowchart LR
    subgraph TuApp["Tu Aplicación"]
        PythonCode["Tu código\nPython/TS"]
        SDK["Claude Code SDK\nquery()"]
    end

    subgraph Proceso["Proceso Hijo"]
        CLI["claude -p\nClaude Code CLI"]
        subgraph CLIInternal["Dentro del CLI"]
            AnthAPI["Cliente API\nAnthropic"]
            ToolExec["Ejecutor\nde Herramientas"]
            AgentLoop["Agent Loop"]
        end
    end

    subgraph External["Externos"]
        AnthServer["Anthropic\nAPI Server"]
        FileSystem["Sistema\nde Archivos"]
        Terminal["Terminal\n(Bash)"]
    end

    PythonCode --> SDK
    SDK --> |"spawn subprocess\nstdin/stdout"| CLI
    CLI --> AnthAPI
    AnthAPI --> |"HTTPS"| AnthServer
    AnthServer --> AnthAPI
    AnthAPI --> AgentLoop
    AgentLoop --> ToolExec
    ToolExec --> FileSystem
    ToolExec --> Terminal
    CLI --> |"JSON messages\nstdout"| SDK
    SDK --> PythonCode

Protocolo de Comunicación: JSON Lines sobre Stdio

La comunicación entre el SDK y el proceso claude es mediante JSON Lines (un objeto JSON por línea) sobre stdio:

stdin → tu código envía el prompt como JSON
stdout ← el CLI emite eventos como JSON lines
stderr ← logs y errores del CLI (no se parsean)

Formato del mensaje de entrada (stdin):

{
  "prompt": "Encuentra el bug en auth.py",
  "options": {
    "allowed_tools": ["Read", "Edit", "Bash"],
    "max_turns": 50,
    "model": "claude-opus-4-5",
    "system_prompt": "Eres un experto en seguridad"
  }
}

Mensajes de salida (stdout) — JSON Lines:

{"type": "system", "subtype": "init", "session_id": "abc123", "tools": [...]}
{"type": "assistant", "message": {"role": "assistant", "content": [{"type": "text", "text": "Analizando auth.py..."}]}}
{"type": "assistant", "message": {"role": "assistant", "content": [{"type": "tool_use", "id": "tu_001", "name": "Read", "input": {"file_path": "auth.py"}}]}}
{"type": "tool", "tool_use_id": "tu_001", "tool_name": "Read", "content": "...contenido del archivo..."}
{"type": "result", "subtype": "success", "result": "El bug está en la línea 42...", "session_id": "abc123", "cost_usd": 0.0234}

Parsing del Output del Subproceso

sequenceDiagram
    participant SDK
    participant Claude as claude -p

    SDK->>Claude: spawn(["claude", "-p"], stdin=PIPE, stdout=PIPE)
    SDK->>Claude: stdin.write(json_prompt + "\n")
    SDK->>Claude: stdin.close()

    loop Streaming de mensajes
        Claude->>SDK: stdout.readline() → JSON line
        SDK->>SDK: json.loads(line)
        SDK->>SDK: yield Message object al caller
    end

    Claude->>SDK: EOF en stdout
    SDK->>SDK: Cerrar proceso

Implementación simplificada del SDK (conceptual):

# sdk_internals_explained.py
# Este código es CONCEPTUAL para entender el SDK, no el código real

import asyncio
import json
import subprocess
from typing import AsyncGenerator, Any


async def query_internal(
    prompt: str,
    allowed_tools: list[str],
    max_turns: int
) -> AsyncGenerator[dict, None]:
    """
    Implementación conceptual de cómo funciona query() internamente.
    El SDK real es más complejo y robusto.
    """

    # 1. Preparar el comando
    cmd = ["claude", "-p", "--output-format", "stream-json", "--verbose"]

    if max_turns:
        cmd.extend(["--max-turns", str(max_turns)])

    for tool in allowed_tools:
        cmd.extend(["--allowedTools", tool])

    # 2. Spawnar el subproceso con stdin/stdout pipes
    process = await asyncio.create_subprocess_exec(
        *cmd,
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    # 3. Enviar el prompt al subproceso via stdin
    prompt_json = json.dumps({"prompt": prompt}) + "\n"
    process.stdin.write(prompt_json.encode())
    await process.stdin.drain()
    process.stdin.close()

    # 4. Leer y parsear JSON Lines del stdout
    async for raw_line in process.stdout:
        line = raw_line.decode().strip()
        if not line:
            continue
        try:
            message = json.loads(line)
            yield message
        except json.JSONDecodeError:
            # El SDK real maneja estos errores gracefully
            continue

    # 5. Esperar que el proceso termine
    await process.wait()

Por Qué Subproceso en vez de API Directa

La razón de este diseño es fundamental: Claude Code tiene capacidades que van mucho más allá de una API conversacional simple:

  1. Gestión de sesiones persistentes: El CLI mantiene el estado entre llamadas
  2. Sistema de herramientas complejo: Read, Write, Edit, Bash, Browser, etc. están implementados en el CLI
  3. MCP (Model Context Protocol): El CLI gestiona las conexiones MCP
  4. Hooks: Los hooks de PreToolUse, PostToolUse se ejecutan en el proceso del CLI
  5. Seguridad de herramientas: El CLI aplica sandboxing y validaciones
  6. Formato de salida: El CLI parsea las respuestas del LLM y extrae tool calls

Esto significa que el SDK es esencialmente un thin wrapper alrededor del CLI que:

Manejo del stdout/stderr

# entendiendo_stderr.py
import asyncio
import json
import subprocess
import sys


async def run_claude_with_debug(prompt: str) -> None:
    """
    Muestra la diferencia entre stdout (mensajes del SDK) y stderr (logs del CLI).
    """
    process = await asyncio.create_subprocess_exec(
        "claude", "-p",
        "--output-format", "stream-json",
        "--verbose",
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    process.stdin.write(prompt.encode() + b"\n")
    await process.stdin.drain()
    process.stdin.close()

    # Leer stdout y stderr concurrentemente
    async def read_stdout():
        async for line in process.stdout:
            decoded = line.decode().strip()
            if decoded:
                try:
                    msg = json.loads(decoded)
                    # stdout = mensajes estructurados del agente
                    print(f"[MENSAJE SDK] tipo={msg.get('type')} subtipo={msg.get('subtype', '')}")
                except json.JSONDecodeError:
                    print(f"[STDOUT RAW] {decoded}")

    async def read_stderr():
        async for line in process.stderr:
            decoded = line.decode().strip()
            if decoded:
                # stderr = logs del CLI (útil para debugging del CLI mismo)
                print(f"[CLI LOG] {decoded}", file=sys.stderr)

    await asyncio.gather(read_stdout(), read_stderr())
    await process.wait()
    print(f"[PROCESO] Código de salida: {process.returncode}")

2. Estructura del Repositorio del SDK

Repositorios Oficiales

flowchart LR
    subgraph Repos["Repositorios GitHub"]
        PyRepo["anthropics/claude-code-sdk-python\ngithub.com/anthropics/claude-code-sdk-python"]
        JSRepo["anthropics/claude-code-sdk-js\ngithub.com/anthropics/claude-code-sdk-js"]
        CLI["anthropics/claude-code\ngithub.com/anthropics/claude-code\n(el binario que el SDK spawna)"]
    end

Módulos Principales del SDK Python

claude_code_sdk/
├── __init__.py          # Exports públicos: query, ClaudeCodeOptions, tipos
├── _client.py           # ClaudeSDKClient - clase principal del cliente
├── _process.py          # ProcessManager - gestión del subproceso
├── _messages.py         # Tipos de mensajes (AssistantMessage, ResultMessage, etc.)
├── _errors.py           # Excepciones del SDK
└── py.typed             # Marker file para type checking

Módulos principales del SDK TypeScript:

src/
├── index.ts             # Exports públicos
├── client.ts            # ClaudeSDKClient
├── process.ts           # Gestión del subproceso Node.js
├── types.ts             # Interfaces TypeScript
└── errors.ts            # Clases de error

Dependencias del SDK

Python (pyproject.toml):

[project]
name = "claude-code-sdk"
dependencies = [
    "anyio>=4.0.0",       # Async I/O compatible con asyncio y trio
]
# Solo dependencia: anyio para async subprocess
# Sin HTTP client (no llama directamente a la API)

TypeScript (package.json):

{
  "dependencies": {
    "execa": "^9.0.0"
  }
}

Las dependencias son intencionalmente mínimas porque el SDK no hace networking — solo gestiona subprocesos y parsea JSON.


3. ClaudeSDKClient — Uso Avanzado

Diferencia entre query() y ClaudeSDKClient

flowchart LR
    subgraph query["query() — Simple"]
        q1["Una query\nUn proceso\nUna sesión\nSe cierra al terminar"]
    end

    subgraph client["ClaudeSDKClient — Avanzado"]
        c1["Múltiples queries\nMismo proceso (posible)\nSesión persistente\nControl explícito de lifecycle"]
    end

    query --> |"Para la mayoría\nde los casos"| UseQuery["Usar query()"]
    client --> |"Para sesiones largas\nde larga duración"| UseClient["Usar ClaudeSDKClient"]

Uso Completo del Client

# claude_client_advanced.py
import asyncio
from claude_code_sdk import ClaudeCodeOptions

# Importación del client (verificar en la versión actual del SDK)
# from claude_code_sdk._client import ClaudeSDKClient


async def demo_client_usage():
    """
    Demonstración de ClaudeSDKClient para sesiones de larga duración.

    El Client es útil cuando:
    1. Tienes múltiples prompts relacionados en una misma sesión
    2. Necesitas control explícito sobre cuándo empieza/termina la sesión
    3. Quieres reusar el contexto entre queries
    """

    options = ClaudeCodeOptions(
        allowed_tools=["Read", "Edit", "Bash"],
        max_turns=50,
        system_prompt="Eres un asistente de desarrollo especializado en Python."
    )

    # Context manager gestiona el lifecycle del proceso
    # async with ClaudeSDKClient(options=options) as client:
    #     # Primera query en la sesión
    #     async for message in client.process_query("¿Qué archivos Python hay?"):
    #         if hasattr(message, 'content'):
    #             print(message.content)
    #
    #     # Segunda query — MISMO proceso, MISMO contexto
    #     async for message in client.process_query("Analiza el más grande"):
    #         if hasattr(message, 'content'):
    #             print(message.content)
    #
    # # Al salir del context manager, el proceso se cierra limpiamente


async def when_to_use_client_vs_query():
    """
    Guía de cuándo usar query() vs Client.
    """

    # ✅ USA query() para:
    # - Tareas independientes (no necesitan contexto de la anterior)
    # - Código más simple y directo
    # - Cuando cada tarea es completamente diferente

    from claude_code_sdk import query

    # Tarea 1 completamente independiente
    async for msg in query(
        "Lista los archivos en src/",
        options=ClaudeCodeOptions(allowed_tools=["Bash"])
    ):
        pass

    # Tarea 2 completamente independiente
    async for msg in query(
        "¿Cuántas líneas tiene el archivo README.md?",
        options=ClaudeCodeOptions(allowed_tools=["Bash"])
    ):
        pass

    # ✅ USA ClaudeSDKClient cuando:
    # - Las tareas son secuenciales y relacionadas
    # - La segunda tarea depende del resultado de la primera
    # - Quieres mantener el historial de la conversación
    # Ejemplo hipotético (verificar API actual del SDK):
    # async with ClaudeSDKClient(...) as client:
    #     await client.query("Lee auth.py y entiende la estructura")
    #     await client.query("Ahora agrega validación de JWT")  # Sabe sobre auth.py

Context Manager y Cierre Seguro

# safe_client_usage.py
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions


async def safe_long_running_agent(tasks: list[str]) -> list[str]:
    """
    Para sesiones largas, usar query() con session_id para reanudar.
    Este es el patrón recomendado para la mayoría de los casos.
    """
    results = []
    session_id = None

    for task in tasks:
        task_results = []

        async for message in query(
            prompt=task,
            options=ClaudeCodeOptions(
                resume=session_id,  # Reanudar sesión si existe
                allowed_tools=["Read", "Edit", "Bash"],
                max_turns=30
            )
        ):
            # Capturar session_id del primer mensaje
            if hasattr(message, 'session_id') and message.session_id:
                session_id = message.session_id

            if hasattr(message, 'content') and message.content:
                task_results.append(str(message.content))

        results.append("\n".join(task_results))
        print(f"Tarea completada. Session ID: {session_id}")

    return results

4. Tipos TypeScript en Profundidad

Interfaces Completas del SDK

// sdk-types-deep.ts
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

// ============================================================
// Tipos de mensajes del SDK
// ============================================================

interface BaseMessage {
  type: string;
}

interface SystemMessage extends BaseMessage {
  type: "system";
  subtype: "init" | "result";
  session_id?: string;
}

interface AssistantMessage extends BaseMessage {
  type: "assistant";
  message: {
    role: "assistant";
    content: Array<TextBlock | ToolUseBlock>;
  };
}

interface TextBlock {
  type: "text";
  text: string;
}

interface ToolUseBlock {
  type: "tool_use";
  id: string;
  name: string;
  input: Record<string, unknown>;
}

interface UserMessage extends BaseMessage {
  type: "user";
  message: {
    role: "user";
    content: Array<ToolResultBlock>;
  };
}

interface ToolResultBlock {
  type: "tool_result";
  tool_use_id: string;
  content: string;
  is_error?: boolean;
}

interface ResultMessage extends BaseMessage {
  type: "result";
  subtype: "success" | "error";
  result?: string;
  session_id?: string;
  cost_usd?: number;
  duration_ms?: number;
  num_turns?: number;
  usage?: {
    input_tokens: number;
    output_tokens: number;
    cache_creation_input_tokens?: number;
    cache_read_input_tokens?: number;
  };
}

// Union type de todos los mensajes posibles
type SDKMessage =
  | SystemMessage
  | AssistantMessage
  | UserMessage
  | ResultMessage;

// ============================================================
// Type Guards para mensajes
// ============================================================

function isResultMessage(msg: SDKMessage): msg is ResultMessage {
  return msg.type === "result";
}

function isAssistantMessage(msg: SDKMessage): msg is AssistantMessage {
  return msg.type === "assistant";
}

function hasTextContent(
  msg: SDKMessage
): msg is AssistantMessage & { message: { content: [TextBlock, ...unknown[]] } } {
  if (!isAssistantMessage(msg)) return false;
  return msg.message.content.some((block) => block.type === "text");
}

function isToolUse(msg: SDKMessage): boolean {
  if (!isAssistantMessage(msg)) return false;
  return msg.message.content.some((block) => block.type === "tool_use");
}

// ============================================================
// Generics para herramientas custom
// ============================================================

interface TypedToolInput<T extends Record<string, unknown>> {
  name: string;
  input: T;
}

interface BashToolInput extends Record<string, unknown> {
  command: string;
  timeout?: number;
}

interface ReadToolInput extends Record<string, unknown> {
  file_path: string;
  offset?: number;
  limit?: number;
}

function extractToolCall<T extends Record<string, unknown>>(
  msg: AssistantMessage,
  toolName: string
): TypedToolInput<T> | null {
  for (const block of msg.message.content) {
    if (block.type === "tool_use" && block.name === toolName) {
      return {
        name: block.name,
        input: block.input as T,
      };
    }
  }
  return null;
}

// ============================================================
// Utility types para el SDK
// ============================================================

// Extraer el tipo del costo de ResultMessage
type CostInfo = Pick<ResultMessage, "cost_usd" | "usage">;

// Tipo para una función que procesa mensajes del SDK
type MessageProcessor<T> = (message: SDKMessage) => T | undefined;

// Tipo para agregar resultados
type ResultAggregator<T> = (
  messages: SDKMessage[]
) => T;

// Helper type: extraer solo mensajes de resultado
type OnlyResults = Extract<SDKMessage, { type: "result" }>;

// ============================================================
// Uso con tipos estrictos
// ============================================================

async function typedAgentRunner(
  prompt: string,
  tools: string[]
): Promise<{
  finalResult: string | undefined;
  cost: CostInfo | undefined;
  toolCalls: Array<{ name: string; input: Record<string, unknown> }>;
}> {
  const messages: SDKMessage[] = [];
  const toolCalls: Array<{ name: string; input: Record<string, unknown> }> = [];

  for await (const message of query(prompt, {
    allowedTools: tools,
  } as ClaudeCodeOptions)) {
    const typed = message as SDKMessage;
    messages.push(typed);

    if (isAssistantMessage(typed)) {
      for (const block of typed.message.content) {
        if (block.type === "tool_use") {
          toolCalls.push({ name: block.name, input: block.input });
        }
      }
    }
  }

  const resultMsg = messages.find(isResultMessage);

  return {
    finalResult: resultMsg?.result,
    cost: resultMsg
      ? { cost_usd: resultMsg.cost_usd, usage: resultMsg.usage }
      : undefined,
    toolCalls,
  };
}

export {
  SDKMessage,
  ResultMessage,
  AssistantMessage,
  isResultMessage,
  isAssistantMessage,
  hasTextContent,
  isToolUse,
  extractToolCall,
  typedAgentRunner,
  BashToolInput,
  ReadToolInput,
};

5. Extender el SDK

Wrapper sobre query() con Comportamiento Adicional

# enterprise_sdk.py
"""
Wrapper empresarial sobre el Claude Code SDK con:
- Autenticación y autorización
- Logging estructurado automático
- Retry con backoff exponencial
- Rate limiting
- Auditoría de acciones
"""
import asyncio
import time
import structlog
from typing import AsyncGenerator, Any
from functools import wraps
from tenacity import (
    AsyncRetrying,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)
from claude_code_sdk import query, ClaudeCodeOptions

logger = structlog.get_logger()


class EnterpriseAgentSDK:
    """
    Wrapper empresarial sobre el SDK con features adicionales.
    Principio Open/Closed: extiende sin modificar el SDK original.
    """

    def __init__(
        self,
        api_key_validator: Any | None = None,
        rate_limiter: Any | None = None,
        audit_trail: Any | None = None,
        max_retries: int = 3,
        allowed_tools_policy: dict[str, list[str]] | None = None
    ):
        """
        Args:
            api_key_validator: Validador de API keys de usuario
            rate_limiter: Rate limiter por usuario
            audit_trail: Sistema de audit trail
            max_retries: Número máximo de reintentos en error transitorio
            allowed_tools_policy: Política de herramientas por rol de usuario
        """
        self._api_key_validator = api_key_validator
        self._rate_limiter = rate_limiter
        self._audit_trail = audit_trail
        self._max_retries = max_retries
        self._allowed_tools_policy = allowed_tools_policy or {
            "admin": ["Read", "Edit", "Write", "Bash"],
            "developer": ["Read", "Edit", "Bash"],
            "readonly": ["Read", "Bash"],
        }

    def _get_tools_for_role(self, user_role: str, requested_tools: list[str]) -> list[str]:
        """Aplica política de herramientas según el rol del usuario."""
        allowed = set(self._allowed_tools_policy.get(user_role, ["Read"]))
        return [t for t in requested_tools if t in allowed]

    async def query(
        self,
        prompt: str,
        user_id: str,
        user_role: str = "developer",
        options: ClaudeCodeOptions | None = None,
        metadata: dict | None = None
    ) -> AsyncGenerator[Any, None]:
        """
        Wrapper sobre query() con todas las features empresariales.
        """
        log = logger.bind(user_id=user_id, user_role=user_role)

        # 1. Validar autorización
        if self._api_key_validator:
            authorized = await self._api_key_validator.validate(user_id)
            if not authorized:
                raise PermissionError(f"Usuario {user_id} no autorizado")

        # 2. Aplicar rate limiting
        if self._rate_limiter:
            allowed = await self._rate_limiter.check(user_id)
            if not allowed:
                raise RuntimeError(f"Rate limit excedido para usuario {user_id}")

        # 3. Aplicar política de herramientas
        if options and options.allowed_tools:
            safe_tools = self._get_tools_for_role(user_role, options.allowed_tools)
            if safe_tools != options.allowed_tools:
                log.warning(
                    "tools_restricted_by_policy",
                    requested=options.allowed_tools,
                    allowed=safe_tools
                )
            options = ClaudeCodeOptions(
                allowed_tools=safe_tools,
                max_turns=getattr(options, 'max_turns', 50),
                system_prompt=getattr(options, 'system_prompt', None),
            )

        # 4. Audit trail: registrar inicio
        if self._audit_trail:
            await self._audit_trail.append(
                user_id=user_id,
                session_id=None,
                agent_name="enterprise_agent",
                action_type="query_started",
                action_details={"prompt_preview": prompt[:200], "metadata": metadata or {}}
            )

        log.info("enterprise_query_started", prompt_preview=prompt[:100])

        # 5. Ejecutar con retry
        async def _execute():
            async for message in query(prompt, options):
                yield message

        async with AsyncRetrying(
            stop=stop_after_attempt(self._max_retries),
            wait=wait_exponential(multiplier=1, min=4, max=60),
            retry=retry_if_exception_type((RuntimeError, TimeoutError)),
            reraise=True
        ):
            async for message in _execute():
                yield message

        log.info("enterprise_query_completed")


# ============================================================
# Plugin system para el SDK
# ============================================================

class SDKPlugin:
    """Interface base para plugins del SDK."""

    async def on_message(self, message: Any) -> Any:
        """Procesar cada mensaje. Retornar el mensaje (posiblemente modificado)."""
        return message

    async def on_tool_call(self, tool_name: str, tool_input: dict) -> dict | None:
        """Interceptar llamadas a herramientas. None = no modificar."""
        return None

    async def on_query_complete(self, result: str, cost_usd: float) -> None:
        """Ejecutar al completar una query."""
        pass


class CostBudgetPlugin(SDKPlugin):
    """Plugin que detiene la ejecución si se excede el presupuesto."""

    def __init__(self, max_cost_usd: float):
        self.max_cost_usd = max_cost_usd
        self.accumulated_cost = 0.0

    async def on_message(self, message: Any) -> Any:
        if hasattr(message, 'cost_usd') and message.cost_usd:
            self.accumulated_cost += float(message.cost_usd)
            if self.accumulated_cost > self.max_cost_usd:
                raise RuntimeError(
                    f"Presupuesto excedido: ${self.accumulated_cost:.4f} > ${self.max_cost_usd}"
                )
        return message


class PluggableAgentRunner:
    """Runner que aplica plugins en cada mensaje."""

    def __init__(self, plugins: list[SDKPlugin] | None = None):
        self.plugins = plugins or []

    async def run(self, prompt: str, options: ClaudeCodeOptions | None = None) -> str:
        results = []

        async for message in query(prompt, options):
            # Aplicar plugins
            processed = message
            for plugin in self.plugins:
                processed = await plugin.on_message(processed)

            if hasattr(processed, 'content') and processed.content:
                results.append(str(processed.content))

        return "\n".join(results)

Middleware Pattern para Hooks

// middleware-pattern.ts
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

type MessageMiddleware = (
  message: unknown,
  next: (msg: unknown) => Promise<void>
) => Promise<void>;

class AgentPipeline {
  private middlewares: MessageMiddleware[] = [];

  use(middleware: MessageMiddleware): this {
    this.middlewares.push(middleware);
    return this;
  }

  private buildStack(): (msg: unknown) => Promise<void> {
    const stack = [...this.middlewares].reverse();
    return stack.reduce(
      (next, middleware) => (msg) => middleware(msg, next),
      async (_msg: unknown) => {}
    );
  }

  async run(prompt: string, options?: ClaudeCodeOptions): Promise<string[]> {
    const results: string[] = [];
    const processMessage = this.buildStack();

    for await (const message of query(prompt, options ?? ({} as ClaudeCodeOptions))) {
      await processMessage(message);
      if ("content" in (message as object)) {
        const content = String((message as { content: unknown }).content);
        if (content) results.push(content);
      }
    }

    return results;
  }
}

// Middlewares reutilizables
const loggingMiddleware: MessageMiddleware = async (message, next) => {
  const type = (message as { type?: string }).type;
  console.log(`[LOG] Message type: ${type}`);
  await next(message);
};

const costTrackingMiddleware = (budget: number): MessageMiddleware => {
  let accumulated = 0;
  return async (message, next) => {
    if ("cost_usd" in (message as object)) {
      accumulated += Number((message as { cost_usd: unknown }).cost_usd) || 0;
      if (accumulated > budget) {
        throw new Error(`Budget exceeded: $${accumulated.toFixed(4)}`);
      }
    }
    await next(message);
  };
};

// Uso
async function runWithMiddleware(prompt: string): Promise<string[]> {
  const pipeline = new AgentPipeline()
    .use(loggingMiddleware)
    .use(costTrackingMiddleware(1.0)); // Max $1.00

  return pipeline.run(prompt, {
    allowedTools: ["Read", "Bash"],
  } as ClaudeCodeOptions);
}

export { AgentPipeline, loggingMiddleware, costTrackingMiddleware };

6. Contribuir al SDK

Proceso de Contribución

flowchart TD
    Issue["Crear/buscar Issue\nen GitHub"] --> Fork["Fork del repo"]
    Fork --> Branch["Crear branch\nfeat/mi-feature o fix/mi-bug"]
    Branch --> Code["Implementar cambios"]
    Code --> Tests["Agregar/actualizar tests"]
    Tests --> Lint["Pasar linters\nruff, mypy, pytest"]
    Lint --> PR["Abrir Pull Request"]
    PR --> Review["Code Review\npor maintainers"]
    Review --> |"Cambios solicitados"| Code
    Review --> |"Aprobado"| Merge["Merge a main"]

Setup para Desarrollo Local

# Clonar el repo del SDK Python
git clone https://github.com/anthropics/claude-code-sdk-python
cd claude-code-sdk-python

# Instalar en modo desarrollo
pip install -e ".[dev]"

# Verificar que los tests pasan
pytest tests/ -v

# Correr linters
ruff check .
mypy .

# Para el SDK TypeScript
git clone https://github.com/anthropics/claude-code-sdk-js
cd claude-code-sdk-js
npm install
npm test
npm run typecheck
npm run lint

Testing Local del SDK Modificado

# test_local_sdk.py
"""
Cómo testear cambios al SDK localmente antes de abrir un PR.
"""
import asyncio
import sys

# Agregar el SDK local al path (para probar tu versión modificada)
# sys.path.insert(0, "/ruta/a/tu/fork/claude-code-sdk-python")

from claude_code_sdk import query, ClaudeCodeOptions


async def test_modification():
    """Test básico para verificar que tu modificación funciona."""
    results = []

    async for message in query(
        "Di hola",
        options=ClaudeCodeOptions(
            allowed_tools=[],
            max_turns=1
        )
    ):
        if hasattr(message, 'content') and message.content:
            results.append(str(message.content))

    assert len(results) > 0, "El SDK debería retornar al menos un mensaje"
    print("✓ Test pasó")


asyncio.run(test_modification())

Cómo Reportar Bugs

## Título del Issue
Describe el problema en una línea clara

## Comportamiento esperado
Lo que debería pasar

## Comportamiento actual
Lo que está pasando

## Reproducción mínima
```python
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions

async def reproduce():
    async for msg in query("prompt que causa el bug", options=ClaudeCodeOptions()):
        print(msg)

asyncio.run(reproduce())

Versión del SDK

claude-code-sdk: X.Y.Z Python: 3.12.X OS: Ubuntu 22.04

Stack trace

...

---

## 7. Changelog y Migración entre Versiones

### Cambios Breaking entre Versiones

```mermaid
timeline
    title Evolución del SDK
    0.0.1 : Lanzamiento inicial
           : query() básica
           : ClaudeCodeOptions

    0.0.5 : Soporte MCP
          : Herramientas personalizadas
          : Mejoras en tipos

    0.0.10 : Hooks PreToolUse PostToolUse
           : session_id para reanudación
           : cost_usd en ResultMessage

    0.0.14+ : ClaudeSDKClient
            : Mejoras de rendimiento
            : Type hints mejorados

Guía de Migración

# migration_guide.py

# ============================================================
# ANTES (versiones antiguas):
# ============================================================

# Importación antigua (si el paquete se llamaba diferente)
# from claude_agent_sdk import query, ClaudeAgentOptions

# ============================================================
# AHORA (versión actual):
# ============================================================

# Importación correcta
from claude_code_sdk import query, ClaudeCodeOptions

# Cambio: ClaudeAgentOptions → ClaudeCodeOptions
# Antes:
# options = ClaudeAgentOptions(allowed_tools=["Read"])
# Ahora:
options = ClaudeCodeOptions(allowed_tools=["Read"])

# Cambio: campo max_turns (verificar nombre exacto en la versión actual)
options_modern = ClaudeCodeOptions(
    allowed_tools=["Read", "Bash"],
    max_turns=50,
)

# ============================================================
# Manejo de Deprecations
# ============================================================

import warnings


def safe_create_options(**kwargs) -> ClaudeCodeOptions:
    """
    Wrapper que maneja campos deprecados gracefully.
    """
    # Filtrar campos que ya no existen
    known_fields = {"allowed_tools", "max_turns", "system_prompt", "resume", "model"}
    unknown = {k: v for k, v in kwargs.items() if k not in known_fields}

    if unknown:
        warnings.warn(
            f"Campos desconocidos en ClaudeCodeOptions serán ignorados: {list(unknown.keys())}",
            DeprecationWarning,
            stacklevel=2
        )

    safe_kwargs = {k: v for k, v in kwargs.items() if k in known_fields}
    return ClaudeCodeOptions(**safe_kwargs)

8. Interoperabilidad

Comparación Final de Frameworks

flowchart LR
    subgraph CCS["Claude Code SDK"]
        direction TB
        CCS1["✓ Acceso completo\na herramientas de Claude Code"]
        CCS2["✓ MCP nativo"]
        CCS3["✓ Hooks de herramientas"]
        CCS4["✗ Solo Claude"]
        CCS5["✗ Sin orquestación\ncompleja incluida"]
    end

    subgraph LC["LangChain"]
        direction TB
        LC1["✓ Multi-modelo"]
        LC2["✓ Ecosistema enorme"]
        LC3["✓ Chains y agents"]
        LC4["✗ Overhead de abstracción"]
        LC5["✗ No acceso a\nherramientas de Claude Code"]
    end

    subgraph CA["CrewAI"]
        direction TB
        CA1["✓ Multi-agente\nout of the box"]
        CA2["✓ Roles y tareas\nbien definidas"]
        CA3["✗ Solo para Claude/OpenAI"]
        CA4["✗ Menos control fino"]
    end

Adapter Pattern: Cambiar de Framework

# adapter_pattern.py
"""
Adapter que permite cambiar de Claude Code SDK a otro framework
sin cambiar el código de negocio.
"""
from abc import ABC, abstractmethod
from typing import AsyncGenerator


class AgentAdapter(ABC):
    """Interface común para todos los frameworks de agentes."""

    @abstractmethod
    async def run(self, prompt: str, tools: list[str]) -> AsyncGenerator[str, None]:
        """Ejecutar el agente y retornar tokens/chunks del resultado."""
        pass

    @abstractmethod
    async def run_complete(self, prompt: str, tools: list[str]) -> str:
        """Ejecutar el agente y retornar el resultado completo."""
        pass


class ClaudeCodeAdapter(AgentAdapter):
    """Adapter para el Claude Code SDK."""

    async def run(self, prompt: str, tools: list[str]) -> AsyncGenerator[str, None]:
        from claude_code_sdk import query, ClaudeCodeOptions

        async for message in query(
            prompt,
            options=ClaudeCodeOptions(allowed_tools=tools)
        ):
            if hasattr(message, 'content') and message.content:
                yield str(message.content)

    async def run_complete(self, prompt: str, tools: list[str]) -> str:
        chunks = []
        async for chunk in self.run(prompt, tools):
            chunks.append(chunk)
        return "\n".join(chunks)


class MockAdapter(AgentAdapter):
    """Adapter mock para tests."""

    def __init__(self, responses: list[str]):
        self._responses = iter(responses)

    async def run(self, prompt: str, tools: list[str]) -> AsyncGenerator[str, None]:
        try:
            response = next(self._responses)
            yield response
        except StopIteration:
            yield "Mock response"

    async def run_complete(self, prompt: str, tools: list[str]) -> str:
        chunks = []
        async for chunk in self.run(prompt, tools):
            chunks.append(chunk)
        return "\n".join(chunks)


# Código de negocio que usa el adapter (no sabe qué framework usa)
class CodeAnalysisService:
    def __init__(self, agent: AgentAdapter):
        self._agent = agent

    async def analyze_file(self, file_path: str) -> str:
        return await self._agent.run_complete(
            prompt=f"Analiza el archivo {file_path} y encuentra problemas",
            tools=["Read", "Bash"]
        )


# Cambiar entre implementaciones sin tocar el servicio
async def main():
    # En producción
    service = CodeAnalysisService(agent=ClaudeCodeAdapter())

    # En tests
    test_service = CodeAnalysisService(
        agent=MockAdapter(["El archivo tiene 3 problemas: ..."])
    )

    result = await test_service.analyze_file("src/auth.py")
    print(result)

Usar con LangChain

# langchain_interop.py
"""
Ejemplo de cómo usar Claude Code SDK como un tool de LangChain.
"""
from typing import Optional, Type
# from langchain.tools import BaseTool  # Requiere langchain instalado
# from pydantic import BaseModel, Field
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions


# class ClaudeCodeInput(BaseModel):
#     prompt: str = Field(description="Tarea para el agente Claude Code")
#     tools: list[str] = Field(default=["Read", "Bash"])


# class ClaudeCodeTool(BaseTool):
#     """LangChain Tool que delega a Claude Code SDK."""
#
#     name: str = "claude_code_agent"
#     description: str = "Agente que puede leer archivos, ejecutar comandos y editar código"
#     args_schema: Type[BaseModel] = ClaudeCodeInput
#
#     def _run(self, prompt: str, tools: list[str] = ["Read", "Bash"]) -> str:
#         """Versión síncrona (requerida por LangChain)."""
#         return asyncio.run(self._arun(prompt, tools))
#
#     async def _arun(self, prompt: str, tools: list[str] = ["Read", "Bash"]) -> str:
#         """Versión asíncrona."""
#         results = []
#         async for message in query(
#             prompt,
#             options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
#         ):
#             if hasattr(message, 'content') and message.content:
#                 results.append(str(message.content))
#         return "\n".join(results)

# Patrón manual sin LangChain (más simple y directo):
async def claude_as_tool(prompt: str, tools: list[str] | None = None) -> str:
    """Claude Code como herramienta reutilizable."""
    if tools is None:
        tools = ["Read", "Bash"]

    results = []
    async for message in query(
        prompt,
        options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
    ):
        if hasattr(message, 'content') and message.content:
            results.append(str(message.content))
    return "\n".join(results)

9. El Futuro del SDK

Funcionalidades en Roadmap

Basado en la dirección del proyecto y las tendencias del ecosistema:

roadmap
    title Roadmap del Claude Code SDK (estimado)
    section Corto plazo
        Mejor soporte para streaming en tiempo real : done, 2025-Q4
        ClaudeSDKClient estable : active, 2026-Q1
        Más opciones de configuración de herramientas : active, 2026-Q1
    section Mediano plazo
        SDK para más lenguajes (Go, Rust, Java) : 2026-Q2
        Built-in rate limiting y retry : 2026-Q2
        Soporte para modelos Haiku/Sonnet específicos : 2026-Q3
    section Largo plazo
        Orquestación multi-agente nativa : 2026-Q4
        Memoria persistente integrada : 2027-Q1
        Evaluación automática de calidad : 2027-Q2

MCP y el Ecosistema Creciente

flowchart TB
    subgraph MCPEcosystem["Ecosistema MCP (Model Context Protocol)"]
        SDK["Claude Code SDK"] --> |"se conecta a"| MCP["MCP Servers"]

        subgraph MCPServers["MCP Servers disponibles"]
            direction LR
            DB["Bases de datos\n(PostgreSQL, SQLite)"]
            Git["Git\n(GitHub, GitLab)"]
            Cloud["Cloud\n(AWS, GCP, Azure)"]
            Custom["Custom\n(tu empresa)"]
        end

        MCP --> DB
        MCP --> Git
        MCP --> Cloud
        MCP --> Custom
    end

El ecosistema MCP está creciendo rápidamente. En 2026, hay cientos de MCP servers disponibles que extienden las capacidades del SDK sin necesidad de actualizar el SDK mismo. Ver modelcontextprotocol.io para la lista completa.

Tendencias en Agentes de IA

flowchart LR
    Hoy["2026: Hoy"] --> |"Evolucionando hacia"| Futuro["2027-2028"]

    subgraph Hoy
        H1["Agentes que\neusan herramientas"]
        H2["Supervisión\nhumana frecuente"]
        H3["Tareas de\nhoras/días"]
    end

    subgraph Futuro
        F1["Agentes que\naprender y se adaptan"]
        F2["Autonomía\ncreciente"]
        F3["Tareas de\nsemanas/meses"]
    end

Implicaciones para el SDK:


Resumen del Tutorial Completo

Hemos recorrido los 20 capítulos del tutorial, desde los conceptos fundamentales hasta las implementaciones avanzadas:

flowchart TB
    subgraph Fundamentos["Fundamentos (Cap 1-5)"]
        C1["1. Introducción"]
        C2["2. Instalación"]
        C3["3. Query básico"]
        C4["4. Herramientas"]
        C5["5. MCP"]
    end

    subgraph Core["Core Features (Cap 6-10)"]
        C6["6. Hooks"]
        C7["7. Subagentes"]
        C8["8. Sesiones"]
        C9["9. Casos de uso"]
        C10["10. Mejores prácticas"]
    end

    subgraph Avanzado["Avanzado (Cap 11-15)"]
        C11["11-15. Patrones avanzados\ny optimización"]
    end

    subgraph Produccion["Producción (Cap 16-20)"]
        C16["16. Seguridad"]
        C17["17. Deployment"]
        C18["18. Observabilidad"]
        C19["19. Multi-agente avanzado"]
        C20["20. SDK Internals"]
    end

    Fundamentos --> Core
    Core --> Avanzado
    Avanzado --> Produccion

Los 5 principios que guían todo el tutorial:

  1. Seguridad primero: Validar inputs, limitar herramientas, no exponer secrets
  2. Observabilidad completa: Logs, métricas, trazas para cada query
  3. Resiliencia: Retry, circuit breaker, graceful degradation
  4. Economía: Monitorear costos, limitar tokens, cachear cuando es posible
  5. Mantenibilidad: Código limpio, tipos estrictos, tests automatizados

Recursos oficiales:

Volver al índice: Tutorial Completo - Índice


Apéndice: Recetas Rápidas

Esta sección concentra snippets listos para copiar y usar directamente en proyectos reales.

Receta 1: Agente con Timeout Configurable

# recipe_timeout.py
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions


async def agent_with_timeout(
    prompt: str,
    tools: list[str],
    timeout_seconds: float = 300.0
) -> str | None:
    """
    Ejecuta el agente con un timeout máximo.
    Retorna None si el agente no termina a tiempo.
    """
    try:
        results = []
        async with asyncio.timeout(timeout_seconds):
            async for message in query(
                prompt=prompt,
                options=ClaudeCodeOptions(
                    allowed_tools=tools,
                    max_turns=50
                )
            ):
                if hasattr(message, 'content') and message.content:
                    results.append(str(message.content))
        return "\n".join(results)

    except asyncio.TimeoutError:
        print(f"[Timeout] El agente excedió {timeout_seconds}s")
        return None

Receta 2: Agente con Retry Exponencial

# recipe_retry.py
import asyncio
import random
from claude_code_sdk import query, ClaudeCodeOptions


async def agent_with_retry(
    prompt: str,
    tools: list[str],
    max_retries: int = 3,
    base_delay: float = 2.0
) -> str:
    """
    Ejecuta el agente con retry y backoff exponencial.
    Ideal para entornos con rate limiting intermitente.
    """
    last_error = None

    for attempt in range(max_retries):
        try:
            results = []
            async for message in query(
                prompt=prompt,
                options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
            ):
                if hasattr(message, 'content') and message.content:
                    results.append(str(message.content))
            return "\n".join(results)

        except Exception as e:
            last_error = e
            if attempt < max_retries - 1:
                # Backoff exponencial con jitter
                delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                print(f"[Retry] Intento {attempt + 1} falló: {e}. Reintentando en {delay:.1f}s")
                await asyncio.sleep(delay)

    raise RuntimeError(f"Falló después de {max_retries} intentos: {last_error}")

Receta 3: Ejecutar Múltiples Agentes en Paralelo con Semáforo

# recipe_parallel.py
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions


async def parallel_agents_with_semaphore(
    tasks: list[str],
    tools: list[str],
    max_concurrent: int = 3
) -> list[str]:
    """
    Ejecuta múltiples agentes en paralelo con un límite de concurrencia.
    El semáforo evita saturar la API con demasiadas solicitudes simultáneas.
    """
    semaphore = asyncio.Semaphore(max_concurrent)

    async def run_one(task: str, index: int) -> tuple[int, str]:
        async with semaphore:
            print(f"[Parallel] Iniciando tarea {index + 1}/{len(tasks)}")
            results = []
            async for message in query(
                prompt=task,
                options=ClaudeCodeOptions(allowed_tools=tools, max_turns=20)
            ):
                if hasattr(message, 'content') and message.content:
                    results.append(str(message.content))
            print(f"[Parallel] Tarea {index + 1} completada")
            return index, "\n".join(results)

    # Crear todas las coroutines
    coroutines = [run_one(task, i) for i, task in enumerate(tasks)]

    # Ejecutar con gather (respeta el semáforo)
    results_with_index = await asyncio.gather(*coroutines)

    # Ordenar por índice original y retornar
    sorted_results = sorted(results_with_index, key=lambda x: x[0])
    return [result for _, result in sorted_results]

Receta 4: Streaming de Resultados a un Cliente HTTP (SSE)

# recipe_sse.py
"""
Transmite el output del agente a un cliente HTTP via Server-Sent Events (SSE).
Permite que el cliente vea el progreso en tiempo real.
"""
import asyncio
import json
from typing import AsyncGenerator
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from claude_code_sdk import query, ClaudeCodeOptions

app = FastAPI()


async def agent_event_stream(prompt: str, tools: list[str]) -> AsyncGenerator[str, None]:
    """Genera eventos SSE del progreso del agente."""
    try:
        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(allowed_tools=tools, max_turns=50)
        ):
            message_type = type(message).__name__

            if message_type == "AssistantMessage":
                content = str(getattr(message, 'content', ''))
                if content:
                    event_data = json.dumps({
                        "type": "content",
                        "text": content[:500]  # Truncar para SSE
                    })
                    yield f"data: {event_data}\n\n"

            elif message_type == "ToolUseBlock":
                tool_name = getattr(message, 'name', 'unknown')
                event_data = json.dumps({
                    "type": "tool_call",
                    "tool": tool_name
                })
                yield f"data: {event_data}\n\n"

            # Señal de fin
            if hasattr(message, 'cost_usd'):
                cost = getattr(message, 'cost_usd', 0) or 0
                event_data = json.dumps({
                    "type": "complete",
                    "cost_usd": float(cost)
                })
                yield f"data: {event_data}\n\n"

    except Exception as e:
        error_data = json.dumps({"type": "error", "message": str(e)})
        yield f"data: {error_data}\n\n"


@app.get("/agent/stream")
async def stream_agent(prompt: str):
    """Endpoint SSE para streaming del agente."""
    return StreamingResponse(
        agent_event_stream(prompt, ["Read", "Bash"]),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # Nginx: deshabilitar buffering
        }
    )

Receta 5: Caché de Resultados con TTL

# recipe_cache.py
"""
Caché de resultados del agente para queries idénticas o similares.
Reduce costos evitando ejecutar el mismo análisis dos veces.
"""
import asyncio
import hashlib
import json
import time
from typing import Any
import redis.asyncio as redis
from claude_code_sdk import query, ClaudeCodeOptions


class AgentResultCache:
    """Caché de resultados con TTL en Redis."""

    def __init__(self, redis_client: redis.Redis, ttl_seconds: int = 3600):
        self.redis = redis_client
        self.ttl = ttl_seconds

    def _cache_key(self, prompt: str, tools: list[str]) -> str:
        """Genera clave de caché determinística."""
        content = json.dumps({"prompt": prompt, "tools": sorted(tools)}, sort_keys=True)
        return f"agent_cache:{hashlib.sha256(content.encode()).hexdigest()[:16]}"

    async def get(self, prompt: str, tools: list[str]) -> str | None:
        """Obtiene resultado cacheado si existe."""
        key = self._cache_key(prompt, tools)
        cached = await self.redis.get(key)
        if cached:
            data = json.loads(cached)
            print(f"[Cache] HIT para key {key[:20]}...")
            return data["result"]
        return None

    async def set(self, prompt: str, tools: list[str], result: str) -> None:
        """Guarda resultado en caché."""
        key = self._cache_key(prompt, tools)
        await self.redis.setex(
            key,
            self.ttl,
            json.dumps({"result": result, "cached_at": time.time()})
        )
        print(f"[Cache] MISS guardado para key {key[:20]}...")

    async def run_with_cache(
        self,
        prompt: str,
        tools: list[str] | None = None
    ) -> str:
        """Ejecuta el agente con caché automático."""
        if tools is None:
            tools = ["Read", "Bash"]

        # Intentar caché primero
        cached = await self.get(prompt, tools)
        if cached:
            return cached

        # Ejecutar agente
        results = []
        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
        ):
            if hasattr(message, 'content') and message.content:
                results.append(str(message.content))

        result = "\n".join(results)

        # Guardar en caché solo si el resultado no es vacío
        if result.strip():
            await self.set(prompt, tools, result)

        return result

Receta 6: Agente con Circuit Breaker

# recipe_circuit_breaker.py
"""
Circuit breaker para el agente: si hay muchos fallos consecutivos,
abre el circuito temporalmente para evitar saturar la API.
"""
import asyncio
import time
from enum import Enum
from claude_code_sdk import query, ClaudeCodeOptions


class CircuitState(str, Enum):
    CLOSED = "closed"      # Normal: peticiones pasan
    OPEN = "open"          # Fallo: peticiones rechazadas
    HALF_OPEN = "half_open"  # Prueba: una petición de prueba


class AgentCircuitBreaker:
    """Circuit breaker para proteger contra fallos en cascada."""

    def __init__(
        self,
        failure_threshold: int = 5,
        reset_timeout: float = 60.0,
        success_threshold: int = 2
    ):
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.success_threshold = success_threshold

        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._success_count = 0
        self._last_failure_time: float | None = None

    @property
    def state(self) -> CircuitState:
        """Verifica si el circuito debe pasar de OPEN a HALF_OPEN."""
        if (
            self._state == CircuitState.OPEN and
            self._last_failure_time is not None and
            time.time() - self._last_failure_time > self.reset_timeout
        ):
            self._state = CircuitState.HALF_OPEN
            self._success_count = 0
        return self._state

    def _on_success(self) -> None:
        if self._state == CircuitState.HALF_OPEN:
            self._success_count += 1
            if self._success_count >= self.success_threshold:
                print("[CircuitBreaker] Circuito cerrado (recuperado)")
                self._state = CircuitState.CLOSED
                self._failure_count = 0
        elif self._state == CircuitState.CLOSED:
            self._failure_count = 0

    def _on_failure(self) -> None:
        self._failure_count += 1
        self._last_failure_time = time.time()

        if self._failure_count >= self.failure_threshold:
            print(f"[CircuitBreaker] Circuito ABIERTO después de {self._failure_count} fallos")
            self._state = CircuitState.OPEN

    async def call(self, prompt: str, tools: list[str]) -> str:
        """Ejecuta el agente con protección de circuit breaker."""
        if self.state == CircuitState.OPEN:
            raise RuntimeError("Circuit breaker ABIERTO: servicio no disponible temporalmente")

        try:
            results = []
            async for message in query(
                prompt=prompt,
                options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
            ):
                if hasattr(message, 'content') and message.content:
                    results.append(str(message.content))

            self._on_success()
            return "\n".join(results)

        except Exception as e:
            self._on_failure()
            raise

Notas Finales

El Claude Code SDK es una herramienta en evolución activa. Algunas consideraciones finales para trabajar con él a largo plazo:

  1. Versionar el SDK: Pin a versiones específicas en producción (claude-code-sdk==0.0.14). El SDK puede tener cambios breaking entre versiones menores mientras está en la fase 0.x.

  2. Probar contra el CLI real: El SDK depende del CLI claude instalado localmente. En CI/CD, asegúrate de que el CLI esté instalado en el environment de build. Usar npm install -g @anthropic-ai/claude-code en el Dockerfile o en los steps de CI.

  3. Costs en producción: El cost_usd que reporta el SDK es el costo de los tokens de la API. Los costos reales pueden diferir ligeramente por promociones, descuentos por volumen o actualizaciones de pricing. Siempre confirmar con el dashboard de Anthropic.

  4. Session IDs son efímeros: Los session_id son específicos a la instalación del CLI y la instancia de Docker. No son portables entre máquinas ni entre versiones del CLI. Usar solo para reanudar dentro de la misma sesión activa.

  5. Seguridad ante todo: Nunca hardcodear ANTHROPIC_API_KEY. Usar variables de entorno o secrets managers. El SDK la lee automáticamente de ANTHROPIC_API_KEY en el environment.

Volver al índice: Tutorial Completo - Índice


10. Protocolo JSON Lines en Detalle

Cada Mensaje es una Línea JSON

El protocolo de comunicación entre el SDK y el proceso claude -p es JSON Lines: cada mensaje es un objeto JSON completo en una sola línea, separados por \n. Esto es diferente de JSON streaming (como NDJSON o JSON streaming de LLMs) porque cada línea es un documento JSON completo y auto-contenido.

sequenceDiagram
    participant App as Tu Aplicación
    participant SDK as SDK query()
    participant Proc as claude -p (subprocess)
    participant API as Anthropic API

    App->>SDK: query(prompt, options)
    SDK->>Proc: spawn(["claude", "-p", "--output-format", "stream-json"])
    SDK->>Proc: stdin.write(prompt_json + "\n")
    SDK->>Proc: stdin.close()

    Proc->>API: POST /v1/messages (HTTPS)
    API-->>Proc: stream de tokens

    Proc-->>SDK: stdout: {"type":"system","subtype":"init",...}\n
    SDK-->>App: yield SystemMessage

    Proc-->>SDK: stdout: {"type":"assistant","message":{...}}\n
    SDK-->>App: yield AssistantMessage

    Proc-->>SDK: stdout: {"type":"tool","tool_use_id":"tu_001",...}\n
    SDK-->>App: yield ToolResultMessage

    Proc-->>SDK: stdout: {"type":"result","subtype":"success",...}\n
    SDK-->>App: yield ResultMessage

    Proc-->>SDK: EOF en stdout
    SDK->>SDK: await process.wait()
    SDK-->>App: StopAsyncIteration

Schema Completo de Cada Tipo de Mensaje

Mensaje tipo system — Inicialización

{
  "type": "system",
  "subtype": "init",
  "session_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "tools": [
    {
      "name": "Read",
      "description": "Read a file from the filesystem",
      "input_schema": {
        "type": "object",
        "properties": {
          "file_path": { "type": "string", "description": "Absolute path to file" },
          "offset": { "type": "integer", "description": "Line number to start from" },
          "limit": { "type": "integer", "description": "Number of lines to read" }
        },
        "required": ["file_path"]
      }
    },
    {
      "name": "Bash",
      "description": "Execute a bash command",
      "input_schema": {
        "type": "object",
        "properties": {
          "command": { "type": "string", "description": "The bash command to execute" },
          "timeout": { "type": "integer", "description": "Timeout in milliseconds" }
        },
        "required": ["command"]
      }
    }
  ],
  "mcp_servers": [],
  "model": "claude-opus-4-5",
  "permissionMode": "default",
  "apiKeySource": "environment"
}

Mensaje tipo assistant — Respuesta del LLM

{
  "type": "assistant",
  "message": {
    "id": "msg_01XFDUDYJgAACzvnptvVoYEL",
    "type": "message",
    "role": "assistant",
    "content": [
      {
        "type": "text",
        "text": "Voy a analizar el archivo auth.py para encontrar bugs de seguridad."
      },
      {
        "type": "tool_use",
        "id": "toolu_01A09q90qw90lq917835lq9",
        "name": "Read",
        "input": {
          "file_path": "/home/user/project/auth.py",
          "offset": 1,
          "limit": 100
        }
      }
    ],
    "model": "claude-opus-4-5",
    "stop_reason": "tool_use",
    "stop_sequence": null,
    "usage": {
      "input_tokens": 1247,
      "output_tokens": 89,
      "cache_creation_input_tokens": 1100,
      "cache_read_input_tokens": 0
    }
  }
}

Mensaje tipo user — Resultado de Herramienta

Este mensaje lo genera el CLI cuando ejecuta la herramienta y empaqueta el resultado para devolverlo al LLM:

{
  "type": "user",
  "message": {
    "role": "user",
    "content": [
      {
        "type": "tool_result",
        "tool_use_id": "toolu_01A09q90qw90lq917835lq9",
        "content": "     1→import hashlib\n     2→import secrets\n     3→import bcrypt\n     4→\n     5→def authenticate(username: str, password: str) -> bool:\n     6→    # BUG: comparación directa de strings (timing attack)\n     7→    stored = get_password_hash(username)\n     8→    return stored == password\n",
        "is_error": false
      }
    ]
  }
}

Mensaje tipo result — Resultado Final

{
  "type": "result",
  "subtype": "success",
  "is_error": false,
  "result": "Encontré 2 vulnerabilidades de seguridad en auth.py:\n\n1. **Timing Attack** (línea 8): La comparación `stored == password` es vulnerable...\n\n2. **Hash débil** (línea 15): Se usa MD5 para hashear passwords...",
  "session_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "cost_usd": 0.02341,
  "duration_ms": 12543,
  "num_turns": 4,
  "usage": {
    "input_tokens": 5823,
    "output_tokens": 342,
    "cache_creation_input_tokens": 4100,
    "cache_read_input_tokens": 1200
  }
}

Mensaje tipo result con error

{
  "type": "result",
  "subtype": "error",
  "is_error": true,
  "error": "max_turns_exceeded",
  "result": "",
  "session_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "cost_usd": 0.045,
  "duration_ms": 89234,
  "num_turns": 50
}

Cómo el SDK Parsea y Valida Cada Mensaje

# sdk_parser_explained.py
"""
Cómo el SDK convierte líneas JSON en objetos Python tipados.
Este código es educativo — el SDK real es más complejo.
"""
import json
from typing import Any
from dataclasses import dataclass


@dataclass
class ParsedMessage:
    raw_type: str
    data: dict[str, Any]


def parse_sdk_line(line: str) -> ParsedMessage | None:
    """
    Parsea una línea JSON del stdout del subproceso.

    Retorna None si la línea está vacía o no es JSON válido.
    El SDK real puede lanzar excepciones específicas en errores de parsing.
    """
    stripped = line.strip()

    # Líneas vacías son normales (buffering del subprocess)
    if not stripped:
        return None

    # Intentar parsear como JSON
    try:
        data = json.loads(stripped)
    except json.JSONDecodeError as e:
        # El SDK real loggea esto como warning y continúa
        # Puede ocurrir si el CLI escribe output no-JSON (ej: mensajes de error)
        print(f"[WARN] Línea no-JSON ignorada: {stripped[:100]!r}{e}")
        return None

    # Validar que tiene el campo 'type'
    msg_type = data.get("type")
    if not isinstance(msg_type, str):
        print(f"[WARN] Mensaje sin 'type' ignorado: {data}")
        return None

    return ParsedMessage(raw_type=msg_type, data=data)


def dispatch_message(parsed: ParsedMessage) -> Any:
    """
    Convierte el mensaje parseado en el objeto Python correspondiente.
    El SDK real usa dataclasses o Pydantic models.
    """
    match parsed.raw_type:
        case "system":
            return handle_system_message(parsed.data)
        case "assistant":
            return handle_assistant_message(parsed.data)
        case "user":
            return handle_user_message(parsed.data)
        case "result":
            return handle_result_message(parsed.data)
        case _:
            # Tipos desconocidos: el SDK los ignora silenciosamente
            # Esto permite compatibilidad futura con nuevos tipos
            print(f"[DEBUG] Tipo de mensaje desconocido: {parsed.raw_type}")
            return None


def handle_result_message(data: dict) -> dict:
    """Maneja el mensaje de resultado final."""
    return {
        "type": "result",
        "success": data.get("subtype") == "success",
        "result": data.get("result", ""),
        "session_id": data.get("session_id"),
        "cost_usd": data.get("cost_usd"),
        "duration_ms": data.get("duration_ms"),
        "num_turns": data.get("num_turns"),
        "usage": data.get("usage", {}),
        "error": data.get("error"),
    }


def handle_system_message(data: dict) -> dict:
    return {"type": "system", "session_id": data.get("session_id"), "tools": data.get("tools", [])}


def handle_assistant_message(data: dict) -> dict:
    return {"type": "assistant", "message": data.get("message", {})}


def handle_user_message(data: dict) -> dict:
    return {"type": "user", "message": data.get("message", {})}

Capturar el Protocolo Raw para Debugging

# capture_raw_protocol.py
"""
Captura el protocolo raw JSON Lines para debugging.
Útil cuando el SDK se comporta de forma inesperada.
"""
import asyncio
import json
import sys
from datetime import datetime


async def capture_protocol_session(
    prompt: str,
    output_file: str = "/tmp/claude_protocol_capture.jsonl"
) -> None:
    """
    Spawna claude -p directamente y captura todos los mensajes raw.
    Los guarda en un archivo JSONL para análisis posterior.
    """
    process = await asyncio.create_subprocess_exec(
        "claude", "-p",
        "--output-format", "stream-json",
        "--verbose",
        "--allowedTools", "Read,Bash",
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    # Enviar prompt
    process.stdin.write(prompt.encode() + b"\n")
    await process.stdin.drain()
    process.stdin.close()

    messages_captured = []

    async def capture_stdout():
        async for raw_line in process.stdout:
            decoded = raw_line.decode().strip()
            if decoded:
                entry = {
                    "timestamp": datetime.utcnow().isoformat(),
                    "source": "stdout",
                    "raw": decoded,
                }
                try:
                    entry["parsed"] = json.loads(decoded)
                    entry["type"] = entry["parsed"].get("type", "unknown")
                except json.JSONDecodeError:
                    entry["parse_error"] = True

                messages_captured.append(entry)
                # Imprimir en tiempo real para debugging
                msg_type = entry.get("type", "?")
                print(f"[{msg_type:12s}] {decoded[:120]}")

    async def capture_stderr():
        async for raw_line in process.stderr:
            decoded = raw_line.decode().strip()
            if decoded:
                print(f"[STDERR] {decoded}", file=sys.stderr)

    await asyncio.gather(capture_stdout(), capture_stderr())
    await process.wait()

    # Guardar captura
    with open(output_file, "w") as f:
        for entry in messages_captured:
            f.write(json.dumps(entry) + "\n")

    print(f"\n[CAPTURE] {len(messages_captured)} mensajes guardados en {output_file}")
    print(f"[CAPTURE] Exit code: {process.returncode}")


# Uso:
# asyncio.run(capture_protocol_session("Lista los archivos Python"))
# Luego analizar: cat /tmp/claude_protocol_capture.jsonl | jq '.type'

TypeScript: Parsear el Protocolo Manualmente

// protocol-parser.ts
import { spawn } from "node:child_process";
import * as readline from "node:readline";

interface SDKProtocolMessage {
  type: "system" | "assistant" | "user" | "result" | "unknown";
  raw: Record<string, unknown>;
  timestamp: Date;
}

async function* captureRawProtocol(
  prompt: string
): AsyncGenerator<SDKProtocolMessage> {
  const process = spawn("claude", [
    "-p",
    "--output-format",
    "stream-json",
    "--allowedTools",
    "Read,Bash",
  ]);

  process.stdin.write(prompt + "\n");
  process.stdin.end();

  const rl = readline.createInterface({ input: process.stdout });
  const messageQueue: SDKProtocolMessage[] = [];
  let resolve: (() => void) | null = null;
  let done = false;

  rl.on("line", (line) => {
    if (!line.trim()) return;
    try {
      const raw = JSON.parse(line) as Record<string, unknown>;
      const msg: SDKProtocolMessage = {
        type: (raw.type as SDKProtocolMessage["type"]) ?? "unknown",
        raw,
        timestamp: new Date(),
      };
      messageQueue.push(msg);
      resolve?.();
      resolve = null;
    } catch {
      console.error(`[PARSE ERROR] ${line.slice(0, 100)}`);
    }
  });

  rl.on("close", () => {
    done = true;
    resolve?.();
  });

  while (!done || messageQueue.length > 0) {
    if (messageQueue.length > 0) {
      yield messageQueue.shift()!;
    } else {
      await new Promise<void>((r) => {
        resolve = r;
      });
    }
  }
}

// Uso: capturar y analizar el protocolo
async function analyzeProtocol(prompt: string): Promise<void> {
  const stats: Record<string, number> = {};

  for await (const msg of captureRawProtocol(prompt)) {
    stats[msg.type] = (stats[msg.type] ?? 0) + 1;
    console.log(`[${msg.timestamp.toISOString()}] type=${msg.type}`);

    if (msg.type === "result") {
      const result = msg.raw as { cost_usd?: number; num_turns?: number };
      console.log(`  cost_usd: $${result.cost_usd?.toFixed(4)}`);
      console.log(`  num_turns: ${result.num_turns}`);
    }
  }

  console.log("\nMessage type distribution:", stats);
}

export { captureRawProtocol, analyzeProtocol, SDKProtocolMessage };

11. Implementar un SDK desde Cero

Esta sección es educativa: implementar query() mínimo desde cero ayuda a entender el SDK real y a debuggear cuando algo falla.

Implementación Mínima Funcional en Python

# minimal_sdk.py
"""
Implementación mínima y funcional de query() desde cero.
Ayuda a entender el SDK real y a debuggear.

NO usar en producción — el SDK oficial tiene:
- Mejor manejo de errores
- Type safety
- Cancelación limpia
- Manejo de MCP
- Reconexión
"""
import asyncio
import json
import os
import shutil
from typing import AsyncGenerator, Any


class MinimalSDKError(Exception):
    """Error del SDK mínimo."""
    pass


class MinimalQueryOptions:
    """Opciones simplificadas para el SDK mínimo."""

    def __init__(
        self,
        allowed_tools: list[str] | None = None,
        max_turns: int = 50,
        system_prompt: str | None = None,
        model: str | None = None,
    ):
        self.allowed_tools = allowed_tools or []
        self.max_turns = max_turns
        self.system_prompt = system_prompt
        self.model = model


async def minimal_query(
    prompt: str,
    options: MinimalQueryOptions | None = None,
) -> AsyncGenerator[dict[str, Any], None]:
    """
    Implementación mínima de query().

    Demuestra el mecanismo fundamental:
    1. Spawnar claude -p con los flags correctos
    2. Enviar prompt por stdin
    3. Leer JSON Lines de stdout
    4. Parsear y hacer yield de cada mensaje
    5. Manejar errores del proceso
    """
    opts = options or MinimalQueryOptions()

    # 1. Construir el comando
    cmd = [
        "claude",
        "-p",
        "--output-format", "stream-json",
        "--max-turns", str(opts.max_turns),
    ]

    for tool in opts.allowed_tools:
        cmd.extend(["--allowedTools", tool])

    if opts.system_prompt:
        cmd.extend(["--system-prompt", opts.system_prompt])

    if opts.model:
        cmd.extend(["--model", opts.model])

    # 2. Verificar que claude está instalado
    if not shutil.which("claude"):
        raise MinimalSDKError(
            "claude CLI no encontrado. Instalar con: npm install -g @anthropic-ai/claude-code"
        )

    # 3. Spawnar el subproceso
    try:
        process = await asyncio.create_subprocess_exec(
            *cmd,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env={
                **os.environ,
                "CLAUDE_CODE_NONINTERACTIVE": "1",
            }
        )
    except FileNotFoundError:
        raise MinimalSDKError("claude CLI no encontrado en PATH")

    # 4. Enviar el prompt por stdin
    try:
        process.stdin.write(prompt.encode("utf-8") + b"\n")
        await process.stdin.drain()
        process.stdin.close()
    except BrokenPipeError:
        stderr_output = await process.stderr.read()
        raise MinimalSDKError(
            f"El proceso falló antes de recibir el prompt: {stderr_output.decode()[:500]}"
        )

    # 5. Leer y parsear JSON Lines del stdout
    parse_errors = 0
    messages_received = 0

    async for raw_line in process.stdout:
        line = raw_line.decode("utf-8", errors="replace").strip()

        if not line:
            continue

        try:
            message = json.loads(line)
            messages_received += 1

            if not isinstance(message, dict) or "type" not in message:
                continue

            yield message

        except json.JSONDecodeError:
            parse_errors += 1
            if parse_errors > 10:
                raise MinimalSDKError(
                    f"Demasiados errores de parsing JSON ({parse_errors}). "
                    f"Última línea: {line[:200]!r}"
                )
            continue

    # 6. Verificar el código de salida del proceso
    return_code = await process.wait()

    if return_code != 0:
        stderr_output = b""
        try:
            stderr_output = await asyncio.wait_for(process.stderr.read(), timeout=2.0)
        except asyncio.TimeoutError:
            pass

        raise MinimalSDKError(
            f"claude CLI terminó con código {return_code}. "
            f"stderr: {stderr_output.decode()[:500]}"
        )

    if messages_received == 0:
        raise MinimalSDKError(
            "claude CLI no produjo ningún mensaje. "
            "Verificar que ANTHROPIC_API_KEY esté configurado."
        )


async def demo_minimal_sdk():
    """Demostración del SDK mínimo."""
    print("Ejecutando query con SDK mínimo...")

    async for message in minimal_query(
        prompt="Lista los archivos Python en el directorio actual",
        options=MinimalQueryOptions(
            allowed_tools=["Bash"],
            max_turns=10,
        )
    ):
        msg_type = message.get("type", "?")

        if msg_type == "assistant":
            content = message.get("message", {}).get("content", [])
            for block in content:
                if block.get("type") == "text":
                    print(f"[ASSISTANT] {block['text'][:200]}")
                elif block.get("type") == "tool_use":
                    print(f"[TOOL CALL] {block['name']}: {str(block['input'])[:100]}")

        elif msg_type == "result":
            cost = message.get("cost_usd", 0)
            turns = message.get("num_turns", 0)
            print(f"\n[DONE] {turns} turnos, costo: ${cost:.4f}")

Implementación Mínima en TypeScript

// minimal-sdk.ts
import { spawn } from "node:child_process";
import * as readline from "node:readline";

interface MinimalOptions {
  allowedTools?: string[];
  maxTurns?: number;
  systemPrompt?: string;
  model?: string;
}

interface SDKMessage {
  type: string;
  [key: string]: unknown;
}

async function* minimalQuery(
  prompt: string,
  options: MinimalOptions = {}
): AsyncGenerator<SDKMessage> {
  const { allowedTools = [], maxTurns = 50, systemPrompt, model } = options;

  const args = ["-p", "--output-format", "stream-json", "--max-turns", String(maxTurns)];

  for (const tool of allowedTools) args.push("--allowedTools", tool);
  if (systemPrompt) args.push("--system-prompt", systemPrompt);
  if (model) args.push("--model", model);

  const proc = spawn("claude", args, {
    stdio: ["pipe", "pipe", "pipe"],
    env: { ...process.env, CLAUDE_CODE_NONINTERACTIVE: "1" },
  });

  proc.stdin.write(prompt + "\n");
  proc.stdin.end();

  const rl = readline.createInterface({ input: proc.stdout, crlfDelay: Infinity });
  const queue: SDKMessage[] = [];
  const waiters: Array<() => void> = [];
  let finished = false;

  rl.on("line", (line) => {
    const trimmed = line.trim();
    if (!trimmed) return;
    try {
      const msg = JSON.parse(trimmed) as SDKMessage;
      if (waiters.length > 0) {
        queue.push(msg);
        waiters.shift()!();
      } else {
        queue.push(msg);
      }
    } catch {
      // Ignorar líneas no-JSON
    }
  });

  rl.on("close", () => {
    finished = true;
    waiters.forEach((r) => r());
    waiters.length = 0;
  });

  while (!finished || queue.length > 0) {
    if (queue.length > 0) {
      yield queue.shift()!;
    } else if (!finished) {
      await new Promise<void>((resolve) => {
        waiters.push(resolve);
      });
    }
  }
}

export { minimalQuery, MinimalOptions, SDKMessage };

Diferencia entre stdin.close() y stdin Persistente

# stdin_modes.py
"""
Explicación de los dos modos de stdin:
1. stdin.close() inmediato: para queries únicas (modo actual del SDK)
2. stdin persistente: para sesiones multi-turn sin reiniciar el proceso (hipotético)
"""
import asyncio
import json


async def single_query_mode(prompt: str):
    """
    Modo actual del SDK: stdin se cierra después del primer prompt.
    El proceso termina después de completar el query.
    """
    process = await asyncio.create_subprocess_exec(
        "claude", "-p", "--output-format", "stream-json",
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    # Enviar prompt
    process.stdin.write(prompt.encode() + b"\n")
    await process.stdin.drain()
    process.stdin.close()  # EOF: el CLI sabe que no vendrán más prompts

    # Leer hasta EOF del stdout
    async for line in process.stdout:
        decoded = line.decode().strip()
        if decoded:
            msg = json.loads(decoded)
            print(f"[MODE:SINGLE] {msg.get('type')}")

    await process.wait()
    # Proceso terminó — para el siguiente query, se spawna uno nuevo


# Por qué el SDK no reutiliza el proceso:
# 1. Cada query puede necesitar un working directory diferente
# 2. Los hooks (PreToolUse, PostToolUse) pueden tener estado que no debería persistir
# 3. El estado del CLI (historial de conversación) se resetea limpiamente
# 4. Facilita el manejo de errores: si el proceso muere, solo afecta una query

12. Monkey Patching y Extensión en Runtime

Interceptar query() para Logging Automático

# monkey_patching.py
"""
Monkey patching del SDK sin modificar su código fuente.
Útil para agregar comportamiento cross-cutting (logging, metrics, tracing)
sin cambiar cada llamada a query().
"""
import time
from typing import AsyncGenerator, Any
import claude_code_sdk
from claude_code_sdk import ClaudeCodeOptions


# Guardar referencia al original antes de patchear
_original_query = claude_code_sdk.query


async def _instrumented_query(
    prompt: str,
    options: ClaudeCodeOptions | None = None,
    **kwargs
) -> AsyncGenerator[Any, None]:
    """
    Versión instrumentada de query() que agrega logging automático.
    """
    start = time.monotonic()
    tool_calls = []

    print(f"[INTERCEPT] query() iniciado — prompt: {prompt[:80]!r}")

    async for message in _original_query(prompt, options, **kwargs):
        msg_type = type(message).__name__

        if msg_type == "ToolUseBlock":
            tool_calls.append(getattr(message, 'name', 'unknown'))

        yield message

    duration = time.monotonic() - start
    print(f"[INTERCEPT] query() completado — {duration:.2f}s, tools: {tool_calls}")


def install_query_interceptor():
    """Instala el interceptor globalmente."""
    claude_code_sdk.query = _instrumented_query
    print("[INTERCEPT] Interceptor instalado")


def uninstall_query_interceptor():
    """Restaura query() original."""
    claude_code_sdk.query = _original_query
    print("[INTERCEPT] Interceptor desinstalado")


# Context manager para interceptación temporal
from contextlib import asynccontextmanager


@asynccontextmanager
async def with_query_interceptor(interceptor_fn):
    """
    Context manager que instala y desinstala un interceptor.

    Uso:
        async with with_query_interceptor(my_interceptor):
            result = await run_agent(...)
    """
    original = claude_code_sdk.query
    claude_code_sdk.query = interceptor_fn
    try:
        yield
    finally:
        claude_code_sdk.query = original

Decorators para Agregar Comportamiento

# sdk_decorators.py
"""
Decorators para agregar comportamiento a funciones que usan el SDK.
Principio Open/Closed: extender sin modificar.
"""
import asyncio
import functools
from typing import Callable


def with_timeout(timeout_seconds: float):
    """Agrega timeout a cualquier función async que use el SDK."""
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            try:
                return await asyncio.wait_for(func(*args, **kwargs), timeout=timeout_seconds)
            except asyncio.TimeoutError:
                raise TimeoutError(f"{func.__name__} excedió {timeout_seconds}s")
        return wrapper
    return decorator


def with_retry(max_attempts: int = 3, delay_seconds: float = 2.0):
    """Agrega retry exponencial."""
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            last_error = None
            for attempt in range(max_attempts):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    last_error = e
                    if attempt < max_attempts - 1:
                        wait = delay_seconds * (2 ** attempt)
                        print(f"[RETRY] Intento {attempt + 1}: {e}. Esperando {wait:.1f}s")
                        await asyncio.sleep(wait)
            raise RuntimeError(f"Falló tras {max_attempts} intentos: {last_error}")
        return wrapper
    return decorator


def with_cost_limit(max_cost_usd: float):
    """Para la ejecución si el costo supera el límite."""
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            import claude_code_sdk
            from claude_code_sdk import query as original_query

            total_cost = 0.0

            async def cost_monitoring_query(prompt, options=None, **kw):
                nonlocal total_cost
                async for msg in original_query(prompt, options, **kw):
                    if hasattr(msg, 'cost_usd') and msg.cost_usd:
                        total_cost += float(msg.cost_usd)
                        if total_cost > max_cost_usd:
                            raise RuntimeError(
                                f"Costo excedido: ${total_cost:.4f} > ${max_cost_usd}"
                            )
                    yield msg

            original = claude_code_sdk.query
            claude_code_sdk.query = cost_monitoring_query
            try:
                return await func(*args, **kwargs)
            finally:
                claude_code_sdk.query = original
        return wrapper
    return decorator


# Uso combinado de decorators
@with_timeout(300)
@with_retry(max_attempts=3)
@with_cost_limit(max_cost_usd=0.50)
async def production_agent(prompt: str) -> str:
    from claude_code_sdk import query, ClaudeCodeOptions

    results = []
    async for message in query(
        prompt=prompt,
        options=ClaudeCodeOptions(allowed_tools=["Read", "Bash"], max_turns=30)
    ):
        if hasattr(message, 'content') and message.content:
            results.append(str(message.content))
    return "\n".join(results)

TypeScript: SDK Wrapper con Proxy

// sdk-proxy.ts
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

type QueryFunction = typeof query;
type QueryMiddleware = (
  prompt: string,
  options: ClaudeCodeOptions | undefined,
  next: QueryFunction
) => ReturnType<QueryFunction>;

function createQueryProxy(middlewares: QueryMiddleware[]): QueryFunction {
  return async function* proxiedQuery(prompt: string, options?: ClaudeCodeOptions) {
    const buildNext = (i: number): QueryFunction => {
      if (i < 0) return query;
      return (p, o) => middlewares[i](p, o, buildNext(i - 1));
    };
    yield* buildNext(middlewares.length - 1)(prompt, options);
  };
}

const loggingMiddleware: QueryMiddleware = async function* (prompt, options, next) {
  const start = Date.now();
  console.log(`[LOG] query started: ${prompt.slice(0, 60)}`);
  try {
    yield* next(prompt, options);
  } finally {
    console.log(`[LOG] query completed in ${Date.now() - start}ms`);
  }
};

const costTrackingMiddleware = (maxCostUsd: number): QueryMiddleware =>
  async function* (prompt, options, next) {
    let totalCost = 0;
    for await (const message of next(prompt, options)) {
      if ("cost_usd" in message && typeof message.cost_usd === "number") {
        totalCost += message.cost_usd;
        if (totalCost > maxCostUsd) {
          throw new Error(`Cost limit exceeded: $${totalCost.toFixed(4)}`);
        }
      }
      yield message;
    }
  };

const instrumentedQuery = createQueryProxy([
  loggingMiddleware,
  costTrackingMiddleware(1.00),
]);

export { instrumentedQuery, createQueryProxy, QueryMiddleware };

13. Async Internals — Cómo el SDK Maneja Concurrencia

Por Qué asyncio en Lugar de threading

flowchart LR
    subgraph AsyncIO["asyncio (usado por el SDK)"]
        A1["Single thread\nno race conditions"]
        A2["Non-blocking I/O\nevent loop eficiente"]
        A3["Cancelación limpia\nCancelledError propagable"]
        A4["Composición natural\nasyncio.gather()"]
    end

    subgraph Threading["threading (no usado)"]
        T1["Race conditions\nnecesita locks"]
        T2["Blocking I/O\nhilos bloqueados en readline()"]
        T3["Cancelación difícil\nno se puede matar un thread"]
        T4["Más overhead\ncambio de contexto del OS"]
    end

Flujo Interno del event loop al Leer Mensajes

# async_internals_deep.py
"""
Explicación profunda de cómo asyncio maneja el subproceso.
"""
import asyncio
import json


async def understand_async_subprocess():
    """
    Demostración de cómo asyncio lee del subproceso sin bloquear.
    """

    # asyncio.create_subprocess_exec registra el file descriptor del pipe
    # en el event loop. Cuando hay datos disponibles, el OS notifica al
    # event loop via epoll/kqueue/IOCP (según el OS).
    process = await asyncio.create_subprocess_exec(
        "echo", '{"type":"result","subtype":"success","result":"hola"}',
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    # Cuando llamamos `async for line in process.stdout`:
    # 1. El event loop pregunta al OS: "¿hay datos en el fd del pipe?"
    # 2. Si hay datos: los retorna inmediatamente
    # 3. Si no hay datos: suspende esta coroutine y ejecuta otras
    #    (no bloquea el thread completo)
    # 4. Cuando el OS notifica que hay datos, reanuda esta coroutine

    async for line in process.stdout:
        decoded = line.decode().strip()
        if decoded:
            msg = json.loads(decoded)
            print(f"Recibido: {msg}")
            # Mientras procesamos este mensaje, el event loop puede
            # estar ejecutando otras coroutines en paralelo

    await process.wait()


# ============================================================
# asyncio.Queue para buffering entre lector y consumidor
# ============================================================

async def producer_consumer_pattern(prompt: str):
    """
    Separar lectura y procesamiento con asyncio.Queue.
    El productor lee líneas del proceso.
    El consumidor procesa mensajes.
    La queue hace de buffer entre ambos.
    """
    import os
    queue: asyncio.Queue[dict | None] = asyncio.Queue(maxsize=50)

    process = await asyncio.create_subprocess_exec(
        "claude", "-p", "--output-format", "stream-json",
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
        env={**os.environ}
    )

    process.stdin.write(prompt.encode() + b"\n")
    await process.stdin.drain()
    process.stdin.close()

    # PRODUCTOR: lee del proceso y pone en la queue
    async def producer():
        async for raw_line in process.stdout:
            line = raw_line.decode().strip()
            if not line:
                continue
            try:
                msg = json.loads(line)
                await queue.put(msg)
                # Si la queue está llena (maxsize=50), esta línea
                # bloquea al productor — esto es backpressure natural.
                # El proceso padre se ralentiza, dando tiempo al consumidor.
            except json.JSONDecodeError:
                pass
        await queue.put(None)  # Señal de fin

    # CONSUMIDOR: procesa mensajes de la queue
    async def consumer():
        results = []
        while True:
            msg = await queue.get()
            if msg is None:
                break

            # Procesamiento potencialmente lento sin bloquear al productor
            await asyncio.sleep(0)  # yield al event loop
            if msg.get("type") == "result":
                results.append(msg.get("result", ""))

            queue.task_done()
        return results

    # Ejecutar productor y consumidor concurrentemente
    producer_task = asyncio.create_task(producer())
    results = await consumer()
    await producer_task

    return results


# ============================================================
# Cancelación limpia
# ============================================================

async def cancellable_agent(prompt: str) -> str | None:
    """
    Agente con soporte de cancelación vía asyncio.CancelledError.
    """
    import os
    process = None

    try:
        process = await asyncio.create_subprocess_exec(
            "claude", "-p", "--output-format", "stream-json",
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env={**os.environ}
        )

        process.stdin.write(prompt.encode() + b"\n")
        await process.stdin.drain()
        process.stdin.close()

        results = []
        async for raw_line in process.stdout:
            line = raw_line.decode().strip()
            if line:
                msg = json.loads(line)
                if msg.get("type") == "result":
                    results.append(msg.get("result", ""))

        await process.wait()
        return "\n".join(results)

    except asyncio.CancelledError:
        # Cleanup limpio cuando se cancela la tarea
        if process and process.returncode is None:
            print("[CANCEL] Terminando proceso claude...")
            process.terminate()
            try:
                await asyncio.wait_for(process.wait(), timeout=5.0)
            except asyncio.TimeoutError:
                process.kill()
                await process.wait()
        raise  # Re-lanzar CancelledError (importante!)

Concurrencia Real: Múltiples Queries en Paralelo

# real_concurrency.py
"""
Demostración de que múltiples query() en paralelo son realmente
concurrentes — cada uno en su propio subproceso.
"""
import asyncio
import time
from claude_code_sdk import query, ClaudeCodeOptions


async def benchmark_concurrent_vs_sequential(tasks: list[str]) -> dict:
    """
    Compara ejecutar N queries secuencialmente vs concurrentemente.
    Demuestra que el SDK soporta concurrencia real.
    """

    # Ejecutar secuencialmente
    start_seq = time.monotonic()
    seq_results = []
    for task in tasks:
        results = []
        async for message in query(
            prompt=task,
            options=ClaudeCodeOptions(allowed_tools=[], max_turns=2)
        ):
            if hasattr(message, 'content') and message.content:
                results.append(str(message.content))
        seq_results.append("\n".join(results))
    seq_duration = time.monotonic() - start_seq

    # Ejecutar concurrentemente
    start_con = time.monotonic()

    async def run_one(task: str) -> str:
        results = []
        async for message in query(
            prompt=task,
            options=ClaudeCodeOptions(allowed_tools=[], max_turns=2)
        ):
            if hasattr(message, 'content') and message.content:
                results.append(str(message.content))
        return "\n".join(results)

    con_results = await asyncio.gather(*[run_one(t) for t in tasks])
    con_duration = time.monotonic() - start_con

    speedup = seq_duration / con_duration if con_duration > 0 else 0

    return {
        "sequential_seconds": round(seq_duration, 2),
        "concurrent_seconds": round(con_duration, 2),
        "speedup": round(speedup, 2),
        "tasks_count": len(tasks),
    }

TypeScript: Concurrencia con Promise.all

// ts-concurrency.ts
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";

async function runAllConcurrently(
  tasks: string[],
  maxConcurrent: number = 5
): Promise<string[]> {
  const semaphore = { count: maxConcurrent };
  const waitQueue: Array<() => void> = [];

  function acquire(): Promise<void> {
    if (semaphore.count > 0) {
      semaphore.count--;
      return Promise.resolve();
    }
    return new Promise((resolve) => waitQueue.push(resolve));
  }

  function release(): void {
    const next = waitQueue.shift();
    if (next) {
      next();
    } else {
      semaphore.count++;
    }
  }

  const runTask = async (task: string, idx: number): Promise<[number, string]> => {
    await acquire();
    try {
      const results: string[] = [];
      for await (const message of query(task, {
        allowedTools: [],
        maxTurns: 2,
      } as ClaudeCodeOptions)) {
        if ("content" in message) {
          const content = String((message as { content: unknown }).content);
          if (content) results.push(content);
        }
      }
      return [idx, results.join("\n")];
    } finally {
      release();
    }
  };

  const results = await Promise.all(tasks.map((t, i) => runTask(t, i)));
  return results.sort(([a], [b]) => a - b).map(([, r]) => r);
}

export { runAllConcurrently };

14. Debugging Avanzado del SDK

Variables de Entorno para Debug

# Debug completo del SDK y CLI
export CLAUDE_CODE_DEBUG=1
export ANTHROPIC_LOG_LEVEL=debug
export OTEL_LOG_LEVEL=debug
export PYTHONASYNCIODEBUG=1        # Python: detectar coroutines bloqueadas

# Ver subprocesos que lanza el SDK (Node.js)
export NODE_DEBUG=child_process

# Strace en Linux para ver syscalls del proceso claude
strace -f -e trace=read,write,open,close -p $(pgrep -f "claude -p") 2>&1 | head -200

Capturar stdin/stdout del Subproceso

# subprocess_inspector.py
"""
Inspección completa del subproceso: qué entra y qué sale.
"""
import asyncio
import json
import time
from pathlib import Path


class SubprocessInspector:
    """
    Wrapper transparente que registra todo lo que entra y sale del proceso claude.
    """

    def __init__(self, capture_dir: str = "/tmp/claude_inspector"):
        self.capture_dir = Path(capture_dir)
        self.capture_dir.mkdir(exist_ok=True)
        self.session_id = str(int(time.time()))

    async def run(self, prompt: str, cli_args: list[str] | None = None) -> None:
        """Ejecuta claude con inspección completa."""
        args = cli_args or ["-p", "--output-format", "stream-json"]

        process = await asyncio.create_subprocess_exec(
            "claude", *args,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )

        stdin_log = self.capture_dir / f"{self.session_id}_stdin.txt"
        stdin_log.write_text(prompt + "\n")
        print(f"[INSPECTOR] stdin → {stdin_log}")

        process.stdin.write(prompt.encode() + b"\n")
        await process.stdin.drain()
        process.stdin.close()

        stdout_lines = []
        stderr_lines = []

        async def capture_stdout():
            async for line in process.stdout:
                decoded = line.decode()
                stdout_lines.append(decoded)
                stripped = decoded.strip()
                if stripped:
                    try:
                        msg = json.loads(stripped)
                        print(f"[STDOUT] type={msg.get('type', '?')}")
                    except Exception:
                        print(f"[STDOUT-RAW] {stripped[:80]}")

        async def capture_stderr():
            async for line in process.stderr:
                decoded = line.decode()
                stderr_lines.append(decoded)
                if decoded.strip():
                    print(f"[STDERR] {decoded.strip()[:120]}")

        await asyncio.gather(capture_stdout(), capture_stderr())
        returncode = await process.wait()

        stdout_log = self.capture_dir / f"{self.session_id}_stdout.jsonl"
        stdout_log.write_text("".join(stdout_lines))

        stderr_log = self.capture_dir / f"{self.session_id}_stderr.txt"
        stderr_log.write_text("".join(stderr_lines))

        print(f"\n[INSPECTOR] Exit code: {returncode}")
        print(f"  stdout → {stdout_log}")
        print(f"  stderr → {stderr_log}")

        # Análisis rápido
        type_counts: dict[str, int] = {}
        total_cost = 0.0
        for line in stdout_lines:
            try:
                msg = json.loads(line.strip())
                t = msg.get("type", "unknown")
                type_counts[t] = type_counts.get(t, 0) + 1
                if t == "result":
                    total_cost += float(msg.get("cost_usd") or 0)
            except Exception:
                pass

        print(f"  Tipos de mensajes: {type_counts}")
        print(f"  Costo total: ${total_cost:.4f}")

Técnicas para Debuggear Agentes Colgados

# debug_hanging.py
"""
Técnicas para debuggear agentes que parecen haberse colgado.
"""
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions


async def agent_with_heartbeat(
    prompt: str,
    heartbeat_interval: float = 30.0
) -> str:
    """
    Ejecuta el agente con heartbeat que muestra que está vivo.
    Si no hay mensajes en heartbeat_interval segundos,
    puede indicar que el agente está esperando al LLM o atascado.
    """
    results = []
    last_message_time = asyncio.get_event_loop().time()

    async def heartbeat():
        while True:
            await asyncio.sleep(heartbeat_interval)
            elapsed = asyncio.get_event_loop().time() - last_message_time
            print(
                f"[HEARTBEAT] Sin mensajes por {elapsed:.0f}s. "
                f"¿Esperando LLM? ¿Herramienta bloqueada?"
            )

    heartbeat_task = asyncio.create_task(heartbeat())

    try:
        async for message in query(
            prompt=prompt,
            options=ClaudeCodeOptions(allowed_tools=["Read", "Bash"], max_turns=50)
        ):
            last_message_time = asyncio.get_event_loop().time()
            msg_type = type(message).__name__
            print(f"[ALIVE] {msg_type}")

            if hasattr(message, 'content') and message.content:
                results.append(str(message.content))
    finally:
        heartbeat_task.cancel()

    return "\n".join(results)


async def dump_process_stack_if_hung(prompt: str, timeout_seconds: float = 60.0) -> None:
    """
    En Linux: si el agente no responde en timeout_seconds,
    captura el stack trace del proceso usando py-spy.
    """
    import subprocess
    import os

    process = await asyncio.create_subprocess_exec(
        "claude", "-p", "--output-format", "stream-json",
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
        env={**os.environ}
    )

    process.stdin.write(prompt.encode() + b"\n")
    await process.stdin.drain()
    process.stdin.close()

    try:
        async with asyncio.timeout(timeout_seconds):
            async for raw_line in process.stdout:
                if raw_line.strip():
                    print(f"[OK] {raw_line.decode().strip()[:100]}")

    except asyncio.TimeoutError:
        pid = process.pid
        print(f"[HUNG] Proceso {pid} colgado — capturando diagnóstico")

        # py-spy: stack trace del proceso Python
        try:
            result = subprocess.run(
                ["py-spy", "dump", "--pid", str(pid)],
                capture_output=True, text=True, timeout=10
            )
            print("[PY-SPY STACK]")
            print(result.stdout or result.stderr)
        except (FileNotFoundError, subprocess.TimeoutExpired):
            print("[WARN] py-spy no disponible")

        process.kill()
        await process.wait()


def reproduce_session_from_log(log_file: str) -> None:
    """
    Carga una sesión de producción desde los logs y la reproduce localmente.
    Permite debuggear el problema exacto que ocurrió en producción.
    """
    import json
    from pathlib import Path

    log_path = Path(log_file)
    if not log_path.exists():
        print(f"[ERROR] Log file no encontrado: {log_file}")
        return

    sessions: dict[str, list[dict]] = {}

    with open(log_path) as f:
        for line in f:
            try:
                entry = json.loads(line.strip())
                sid = entry.get("session_id", "unknown")
                sessions.setdefault(sid, []).append(entry)
            except json.JSONDecodeError:
                pass

    print(f"[REPRODUCE] Encontradas {len(sessions)} sesiones en {log_file}")
    for session_id, events in list(sessions.items())[:5]:
        print(f"\n  Sesión: {session_id}")
        for event in events:
            print(f"    [{event.get('event', '?')}] tool={event.get('tool', '-')} cost=${event.get('cost_usd', 0):.4f}")