Capítulo 20: SDK Internals y Extensiones
Capítulo 20: SDK Internals y Extensiones
Entender cómo funciona el SDK por dentro no solo satisface la curiosidad técnica — permite debuggear problemas difíciles, crear extensiones robustas y aprovechar características no documentadas. Este capítulo abre la caja negra del Claude Code SDK.
1. Cómo Funciona el SDK Internamente
El SDK Spawna claude -p como Subproceso
La arquitectura más sorprendente del SDK es que no llama directamente a la API de Anthropic. En cambio, spawna el binario claude (Claude Code CLI) como un subproceso hijo y se comunica con él mediante JSON sobre stdio.
flowchart LR
subgraph TuApp["Tu Aplicación"]
PythonCode["Tu código\nPython/TS"]
SDK["Claude Code SDK\nquery()"]
end
subgraph Proceso["Proceso Hijo"]
CLI["claude -p\nClaude Code CLI"]
subgraph CLIInternal["Dentro del CLI"]
AnthAPI["Cliente API\nAnthropic"]
ToolExec["Ejecutor\nde Herramientas"]
AgentLoop["Agent Loop"]
end
end
subgraph External["Externos"]
AnthServer["Anthropic\nAPI Server"]
FileSystem["Sistema\nde Archivos"]
Terminal["Terminal\n(Bash)"]
end
PythonCode --> SDK
SDK --> |"spawn subprocess\nstdin/stdout"| CLI
CLI --> AnthAPI
AnthAPI --> |"HTTPS"| AnthServer
AnthServer --> AnthAPI
AnthAPI --> AgentLoop
AgentLoop --> ToolExec
ToolExec --> FileSystem
ToolExec --> Terminal
CLI --> |"JSON messages\nstdout"| SDK
SDK --> PythonCode
Protocolo de Comunicación: JSON Lines sobre Stdio
La comunicación entre el SDK y el proceso claude es mediante JSON Lines (un objeto JSON por línea) sobre stdio:
stdin → tu código envía el prompt como JSON
stdout ← el CLI emite eventos como JSON lines
stderr ← logs y errores del CLI (no se parsean)
Formato del mensaje de entrada (stdin):
{
"prompt": "Encuentra el bug en auth.py",
"options": {
"allowed_tools": ["Read", "Edit", "Bash"],
"max_turns": 50,
"model": "claude-opus-4-5",
"system_prompt": "Eres un experto en seguridad"
}
}
Mensajes de salida (stdout) — JSON Lines:
{"type": "system", "subtype": "init", "session_id": "abc123", "tools": [...]}
{"type": "assistant", "message": {"role": "assistant", "content": [{"type": "text", "text": "Analizando auth.py..."}]}}
{"type": "assistant", "message": {"role": "assistant", "content": [{"type": "tool_use", "id": "tu_001", "name": "Read", "input": {"file_path": "auth.py"}}]}}
{"type": "tool", "tool_use_id": "tu_001", "tool_name": "Read", "content": "...contenido del archivo..."}
{"type": "result", "subtype": "success", "result": "El bug está en la línea 42...", "session_id": "abc123", "cost_usd": 0.0234}
Parsing del Output del Subproceso
sequenceDiagram
participant SDK
participant Claude as claude -p
SDK->>Claude: spawn(["claude", "-p"], stdin=PIPE, stdout=PIPE)
SDK->>Claude: stdin.write(json_prompt + "\n")
SDK->>Claude: stdin.close()
loop Streaming de mensajes
Claude->>SDK: stdout.readline() → JSON line
SDK->>SDK: json.loads(line)
SDK->>SDK: yield Message object al caller
end
Claude->>SDK: EOF en stdout
SDK->>SDK: Cerrar proceso
Implementación simplificada del SDK (conceptual):
# sdk_internals_explained.py
# Este código es CONCEPTUAL para entender el SDK, no el código real
import asyncio
import json
import subprocess
from typing import AsyncGenerator, Any
async def query_internal(
prompt: str,
allowed_tools: list[str],
max_turns: int
) -> AsyncGenerator[dict, None]:
"""
Implementación conceptual de cómo funciona query() internamente.
El SDK real es más complejo y robusto.
"""
# 1. Preparar el comando
cmd = ["claude", "-p", "--output-format", "stream-json", "--verbose"]
if max_turns:
cmd.extend(["--max-turns", str(max_turns)])
for tool in allowed_tools:
cmd.extend(["--allowedTools", tool])
# 2. Spawnar el subproceso con stdin/stdout pipes
process = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# 3. Enviar el prompt al subproceso via stdin
prompt_json = json.dumps({"prompt": prompt}) + "\n"
process.stdin.write(prompt_json.encode())
await process.stdin.drain()
process.stdin.close()
# 4. Leer y parsear JSON Lines del stdout
async for raw_line in process.stdout:
line = raw_line.decode().strip()
if not line:
continue
try:
message = json.loads(line)
yield message
except json.JSONDecodeError:
# El SDK real maneja estos errores gracefully
continue
# 5. Esperar que el proceso termine
await process.wait()
Por Qué Subproceso en vez de API Directa
La razón de este diseño es fundamental: Claude Code tiene capacidades que van mucho más allá de una API conversacional simple:
- Gestión de sesiones persistentes: El CLI mantiene el estado entre llamadas
- Sistema de herramientas complejo: Read, Write, Edit, Bash, Browser, etc. están implementados en el CLI
- MCP (Model Context Protocol): El CLI gestiona las conexiones MCP
- Hooks: Los hooks de PreToolUse, PostToolUse se ejecutan en el proceso del CLI
- Seguridad de herramientas: El CLI aplica sandboxing y validaciones
- Formato de salida: El CLI parsea las respuestas del LLM y extrae tool calls
Esto significa que el SDK es esencialmente un thin wrapper alrededor del CLI que:
- Spawna el proceso
- Serializa la entrada a JSON
- Deserializa la salida de JSON Lines
- Maneja errores del proceso
- Provee una API async/generator conveniente
Manejo del stdout/stderr
# entendiendo_stderr.py
import asyncio
import json
import subprocess
import sys
async def run_claude_with_debug(prompt: str) -> None:
"""
Muestra la diferencia entre stdout (mensajes del SDK) y stderr (logs del CLI).
"""
process = await asyncio.create_subprocess_exec(
"claude", "-p",
"--output-format", "stream-json",
"--verbose",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
process.stdin.write(prompt.encode() + b"\n")
await process.stdin.drain()
process.stdin.close()
# Leer stdout y stderr concurrentemente
async def read_stdout():
async for line in process.stdout:
decoded = line.decode().strip()
if decoded:
try:
msg = json.loads(decoded)
# stdout = mensajes estructurados del agente
print(f"[MENSAJE SDK] tipo={msg.get('type')} subtipo={msg.get('subtype', '')}")
except json.JSONDecodeError:
print(f"[STDOUT RAW] {decoded}")
async def read_stderr():
async for line in process.stderr:
decoded = line.decode().strip()
if decoded:
# stderr = logs del CLI (útil para debugging del CLI mismo)
print(f"[CLI LOG] {decoded}", file=sys.stderr)
await asyncio.gather(read_stdout(), read_stderr())
await process.wait()
print(f"[PROCESO] Código de salida: {process.returncode}")
2. Estructura del Repositorio del SDK
Repositorios Oficiales
flowchart LR
subgraph Repos["Repositorios GitHub"]
PyRepo["anthropics/claude-code-sdk-python\ngithub.com/anthropics/claude-code-sdk-python"]
JSRepo["anthropics/claude-code-sdk-js\ngithub.com/anthropics/claude-code-sdk-js"]
CLI["anthropics/claude-code\ngithub.com/anthropics/claude-code\n(el binario que el SDK spawna)"]
end
Módulos Principales del SDK Python
claude_code_sdk/
├── __init__.py # Exports públicos: query, ClaudeCodeOptions, tipos
├── _client.py # ClaudeSDKClient - clase principal del cliente
├── _process.py # ProcessManager - gestión del subproceso
├── _messages.py # Tipos de mensajes (AssistantMessage, ResultMessage, etc.)
├── _errors.py # Excepciones del SDK
└── py.typed # Marker file para type checking
Módulos principales del SDK TypeScript:
src/
├── index.ts # Exports públicos
├── client.ts # ClaudeSDKClient
├── process.ts # Gestión del subproceso Node.js
├── types.ts # Interfaces TypeScript
└── errors.ts # Clases de error
Dependencias del SDK
Python (pyproject.toml):
[project]
name = "claude-code-sdk"
dependencies = [
"anyio>=4.0.0", # Async I/O compatible con asyncio y trio
]
# Solo dependencia: anyio para async subprocess
# Sin HTTP client (no llama directamente a la API)
TypeScript (package.json):
{
"dependencies": {
"execa": "^9.0.0"
}
}
Las dependencias son intencionalmente mínimas porque el SDK no hace networking — solo gestiona subprocesos y parsea JSON.
3. ClaudeSDKClient — Uso Avanzado
Diferencia entre query() y ClaudeSDKClient
flowchart LR
subgraph query["query() — Simple"]
q1["Una query\nUn proceso\nUna sesión\nSe cierra al terminar"]
end
subgraph client["ClaudeSDKClient — Avanzado"]
c1["Múltiples queries\nMismo proceso (posible)\nSesión persistente\nControl explícito de lifecycle"]
end
query --> |"Para la mayoría\nde los casos"| UseQuery["Usar query()"]
client --> |"Para sesiones largas\nde larga duración"| UseClient["Usar ClaudeSDKClient"]
Uso Completo del Client
# claude_client_advanced.py
import asyncio
from claude_code_sdk import ClaudeCodeOptions
# Importación del client (verificar en la versión actual del SDK)
# from claude_code_sdk._client import ClaudeSDKClient
async def demo_client_usage():
"""
Demonstración de ClaudeSDKClient para sesiones de larga duración.
El Client es útil cuando:
1. Tienes múltiples prompts relacionados en una misma sesión
2. Necesitas control explícito sobre cuándo empieza/termina la sesión
3. Quieres reusar el contexto entre queries
"""
options = ClaudeCodeOptions(
allowed_tools=["Read", "Edit", "Bash"],
max_turns=50,
system_prompt="Eres un asistente de desarrollo especializado en Python."
)
# Context manager gestiona el lifecycle del proceso
# async with ClaudeSDKClient(options=options) as client:
# # Primera query en la sesión
# async for message in client.process_query("¿Qué archivos Python hay?"):
# if hasattr(message, 'content'):
# print(message.content)
#
# # Segunda query — MISMO proceso, MISMO contexto
# async for message in client.process_query("Analiza el más grande"):
# if hasattr(message, 'content'):
# print(message.content)
#
# # Al salir del context manager, el proceso se cierra limpiamente
async def when_to_use_client_vs_query():
"""
Guía de cuándo usar query() vs Client.
"""
# ✅ USA query() para:
# - Tareas independientes (no necesitan contexto de la anterior)
# - Código más simple y directo
# - Cuando cada tarea es completamente diferente
from claude_code_sdk import query
# Tarea 1 completamente independiente
async for msg in query(
"Lista los archivos en src/",
options=ClaudeCodeOptions(allowed_tools=["Bash"])
):
pass
# Tarea 2 completamente independiente
async for msg in query(
"¿Cuántas líneas tiene el archivo README.md?",
options=ClaudeCodeOptions(allowed_tools=["Bash"])
):
pass
# ✅ USA ClaudeSDKClient cuando:
# - Las tareas son secuenciales y relacionadas
# - La segunda tarea depende del resultado de la primera
# - Quieres mantener el historial de la conversación
# Ejemplo hipotético (verificar API actual del SDK):
# async with ClaudeSDKClient(...) as client:
# await client.query("Lee auth.py y entiende la estructura")
# await client.query("Ahora agrega validación de JWT") # Sabe sobre auth.py
Context Manager y Cierre Seguro
# safe_client_usage.py
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
async def safe_long_running_agent(tasks: list[str]) -> list[str]:
"""
Para sesiones largas, usar query() con session_id para reanudar.
Este es el patrón recomendado para la mayoría de los casos.
"""
results = []
session_id = None
for task in tasks:
task_results = []
async for message in query(
prompt=task,
options=ClaudeCodeOptions(
resume=session_id, # Reanudar sesión si existe
allowed_tools=["Read", "Edit", "Bash"],
max_turns=30
)
):
# Capturar session_id del primer mensaje
if hasattr(message, 'session_id') and message.session_id:
session_id = message.session_id
if hasattr(message, 'content') and message.content:
task_results.append(str(message.content))
results.append("\n".join(task_results))
print(f"Tarea completada. Session ID: {session_id}")
return results
4. Tipos TypeScript en Profundidad
Interfaces Completas del SDK
// sdk-types-deep.ts
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
// ============================================================
// Tipos de mensajes del SDK
// ============================================================
interface BaseMessage {
type: string;
}
interface SystemMessage extends BaseMessage {
type: "system";
subtype: "init" | "result";
session_id?: string;
}
interface AssistantMessage extends BaseMessage {
type: "assistant";
message: {
role: "assistant";
content: Array<TextBlock | ToolUseBlock>;
};
}
interface TextBlock {
type: "text";
text: string;
}
interface ToolUseBlock {
type: "tool_use";
id: string;
name: string;
input: Record<string, unknown>;
}
interface UserMessage extends BaseMessage {
type: "user";
message: {
role: "user";
content: Array<ToolResultBlock>;
};
}
interface ToolResultBlock {
type: "tool_result";
tool_use_id: string;
content: string;
is_error?: boolean;
}
interface ResultMessage extends BaseMessage {
type: "result";
subtype: "success" | "error";
result?: string;
session_id?: string;
cost_usd?: number;
duration_ms?: number;
num_turns?: number;
usage?: {
input_tokens: number;
output_tokens: number;
cache_creation_input_tokens?: number;
cache_read_input_tokens?: number;
};
}
// Union type de todos los mensajes posibles
type SDKMessage =
| SystemMessage
| AssistantMessage
| UserMessage
| ResultMessage;
// ============================================================
// Type Guards para mensajes
// ============================================================
function isResultMessage(msg: SDKMessage): msg is ResultMessage {
return msg.type === "result";
}
function isAssistantMessage(msg: SDKMessage): msg is AssistantMessage {
return msg.type === "assistant";
}
function hasTextContent(
msg: SDKMessage
): msg is AssistantMessage & { message: { content: [TextBlock, ...unknown[]] } } {
if (!isAssistantMessage(msg)) return false;
return msg.message.content.some((block) => block.type === "text");
}
function isToolUse(msg: SDKMessage): boolean {
if (!isAssistantMessage(msg)) return false;
return msg.message.content.some((block) => block.type === "tool_use");
}
// ============================================================
// Generics para herramientas custom
// ============================================================
interface TypedToolInput<T extends Record<string, unknown>> {
name: string;
input: T;
}
interface BashToolInput extends Record<string, unknown> {
command: string;
timeout?: number;
}
interface ReadToolInput extends Record<string, unknown> {
file_path: string;
offset?: number;
limit?: number;
}
function extractToolCall<T extends Record<string, unknown>>(
msg: AssistantMessage,
toolName: string
): TypedToolInput<T> | null {
for (const block of msg.message.content) {
if (block.type === "tool_use" && block.name === toolName) {
return {
name: block.name,
input: block.input as T,
};
}
}
return null;
}
// ============================================================
// Utility types para el SDK
// ============================================================
// Extraer el tipo del costo de ResultMessage
type CostInfo = Pick<ResultMessage, "cost_usd" | "usage">;
// Tipo para una función que procesa mensajes del SDK
type MessageProcessor<T> = (message: SDKMessage) => T | undefined;
// Tipo para agregar resultados
type ResultAggregator<T> = (
messages: SDKMessage[]
) => T;
// Helper type: extraer solo mensajes de resultado
type OnlyResults = Extract<SDKMessage, { type: "result" }>;
// ============================================================
// Uso con tipos estrictos
// ============================================================
async function typedAgentRunner(
prompt: string,
tools: string[]
): Promise<{
finalResult: string | undefined;
cost: CostInfo | undefined;
toolCalls: Array<{ name: string; input: Record<string, unknown> }>;
}> {
const messages: SDKMessage[] = [];
const toolCalls: Array<{ name: string; input: Record<string, unknown> }> = [];
for await (const message of query(prompt, {
allowedTools: tools,
} as ClaudeCodeOptions)) {
const typed = message as SDKMessage;
messages.push(typed);
if (isAssistantMessage(typed)) {
for (const block of typed.message.content) {
if (block.type === "tool_use") {
toolCalls.push({ name: block.name, input: block.input });
}
}
}
}
const resultMsg = messages.find(isResultMessage);
return {
finalResult: resultMsg?.result,
cost: resultMsg
? { cost_usd: resultMsg.cost_usd, usage: resultMsg.usage }
: undefined,
toolCalls,
};
}
export {
SDKMessage,
ResultMessage,
AssistantMessage,
isResultMessage,
isAssistantMessage,
hasTextContent,
isToolUse,
extractToolCall,
typedAgentRunner,
BashToolInput,
ReadToolInput,
};
5. Extender el SDK
Wrapper sobre query() con Comportamiento Adicional
# enterprise_sdk.py
"""
Wrapper empresarial sobre el Claude Code SDK con:
- Autenticación y autorización
- Logging estructurado automático
- Retry con backoff exponencial
- Rate limiting
- Auditoría de acciones
"""
import asyncio
import time
import structlog
from typing import AsyncGenerator, Any
from functools import wraps
from tenacity import (
AsyncRetrying,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
from claude_code_sdk import query, ClaudeCodeOptions
logger = structlog.get_logger()
class EnterpriseAgentSDK:
"""
Wrapper empresarial sobre el SDK con features adicionales.
Principio Open/Closed: extiende sin modificar el SDK original.
"""
def __init__(
self,
api_key_validator: Any | None = None,
rate_limiter: Any | None = None,
audit_trail: Any | None = None,
max_retries: int = 3,
allowed_tools_policy: dict[str, list[str]] | None = None
):
"""
Args:
api_key_validator: Validador de API keys de usuario
rate_limiter: Rate limiter por usuario
audit_trail: Sistema de audit trail
max_retries: Número máximo de reintentos en error transitorio
allowed_tools_policy: Política de herramientas por rol de usuario
"""
self._api_key_validator = api_key_validator
self._rate_limiter = rate_limiter
self._audit_trail = audit_trail
self._max_retries = max_retries
self._allowed_tools_policy = allowed_tools_policy or {
"admin": ["Read", "Edit", "Write", "Bash"],
"developer": ["Read", "Edit", "Bash"],
"readonly": ["Read", "Bash"],
}
def _get_tools_for_role(self, user_role: str, requested_tools: list[str]) -> list[str]:
"""Aplica política de herramientas según el rol del usuario."""
allowed = set(self._allowed_tools_policy.get(user_role, ["Read"]))
return [t for t in requested_tools if t in allowed]
async def query(
self,
prompt: str,
user_id: str,
user_role: str = "developer",
options: ClaudeCodeOptions | None = None,
metadata: dict | None = None
) -> AsyncGenerator[Any, None]:
"""
Wrapper sobre query() con todas las features empresariales.
"""
log = logger.bind(user_id=user_id, user_role=user_role)
# 1. Validar autorización
if self._api_key_validator:
authorized = await self._api_key_validator.validate(user_id)
if not authorized:
raise PermissionError(f"Usuario {user_id} no autorizado")
# 2. Aplicar rate limiting
if self._rate_limiter:
allowed = await self._rate_limiter.check(user_id)
if not allowed:
raise RuntimeError(f"Rate limit excedido para usuario {user_id}")
# 3. Aplicar política de herramientas
if options and options.allowed_tools:
safe_tools = self._get_tools_for_role(user_role, options.allowed_tools)
if safe_tools != options.allowed_tools:
log.warning(
"tools_restricted_by_policy",
requested=options.allowed_tools,
allowed=safe_tools
)
options = ClaudeCodeOptions(
allowed_tools=safe_tools,
max_turns=getattr(options, 'max_turns', 50),
system_prompt=getattr(options, 'system_prompt', None),
)
# 4. Audit trail: registrar inicio
if self._audit_trail:
await self._audit_trail.append(
user_id=user_id,
session_id=None,
agent_name="enterprise_agent",
action_type="query_started",
action_details={"prompt_preview": prompt[:200], "metadata": metadata or {}}
)
log.info("enterprise_query_started", prompt_preview=prompt[:100])
# 5. Ejecutar con retry
async def _execute():
async for message in query(prompt, options):
yield message
async with AsyncRetrying(
stop=stop_after_attempt(self._max_retries),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type((RuntimeError, TimeoutError)),
reraise=True
):
async for message in _execute():
yield message
log.info("enterprise_query_completed")
# ============================================================
# Plugin system para el SDK
# ============================================================
class SDKPlugin:
"""Interface base para plugins del SDK."""
async def on_message(self, message: Any) -> Any:
"""Procesar cada mensaje. Retornar el mensaje (posiblemente modificado)."""
return message
async def on_tool_call(self, tool_name: str, tool_input: dict) -> dict | None:
"""Interceptar llamadas a herramientas. None = no modificar."""
return None
async def on_query_complete(self, result: str, cost_usd: float) -> None:
"""Ejecutar al completar una query."""
pass
class CostBudgetPlugin(SDKPlugin):
"""Plugin que detiene la ejecución si se excede el presupuesto."""
def __init__(self, max_cost_usd: float):
self.max_cost_usd = max_cost_usd
self.accumulated_cost = 0.0
async def on_message(self, message: Any) -> Any:
if hasattr(message, 'cost_usd') and message.cost_usd:
self.accumulated_cost += float(message.cost_usd)
if self.accumulated_cost > self.max_cost_usd:
raise RuntimeError(
f"Presupuesto excedido: ${self.accumulated_cost:.4f} > ${self.max_cost_usd}"
)
return message
class PluggableAgentRunner:
"""Runner que aplica plugins en cada mensaje."""
def __init__(self, plugins: list[SDKPlugin] | None = None):
self.plugins = plugins or []
async def run(self, prompt: str, options: ClaudeCodeOptions | None = None) -> str:
results = []
async for message in query(prompt, options):
# Aplicar plugins
processed = message
for plugin in self.plugins:
processed = await plugin.on_message(processed)
if hasattr(processed, 'content') and processed.content:
results.append(str(processed.content))
return "\n".join(results)
Middleware Pattern para Hooks
// middleware-pattern.ts
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
type MessageMiddleware = (
message: unknown,
next: (msg: unknown) => Promise<void>
) => Promise<void>;
class AgentPipeline {
private middlewares: MessageMiddleware[] = [];
use(middleware: MessageMiddleware): this {
this.middlewares.push(middleware);
return this;
}
private buildStack(): (msg: unknown) => Promise<void> {
const stack = [...this.middlewares].reverse();
return stack.reduce(
(next, middleware) => (msg) => middleware(msg, next),
async (_msg: unknown) => {}
);
}
async run(prompt: string, options?: ClaudeCodeOptions): Promise<string[]> {
const results: string[] = [];
const processMessage = this.buildStack();
for await (const message of query(prompt, options ?? ({} as ClaudeCodeOptions))) {
await processMessage(message);
if ("content" in (message as object)) {
const content = String((message as { content: unknown }).content);
if (content) results.push(content);
}
}
return results;
}
}
// Middlewares reutilizables
const loggingMiddleware: MessageMiddleware = async (message, next) => {
const type = (message as { type?: string }).type;
console.log(`[LOG] Message type: ${type}`);
await next(message);
};
const costTrackingMiddleware = (budget: number): MessageMiddleware => {
let accumulated = 0;
return async (message, next) => {
if ("cost_usd" in (message as object)) {
accumulated += Number((message as { cost_usd: unknown }).cost_usd) || 0;
if (accumulated > budget) {
throw new Error(`Budget exceeded: $${accumulated.toFixed(4)}`);
}
}
await next(message);
};
};
// Uso
async function runWithMiddleware(prompt: string): Promise<string[]> {
const pipeline = new AgentPipeline()
.use(loggingMiddleware)
.use(costTrackingMiddleware(1.0)); // Max $1.00
return pipeline.run(prompt, {
allowedTools: ["Read", "Bash"],
} as ClaudeCodeOptions);
}
export { AgentPipeline, loggingMiddleware, costTrackingMiddleware };
6. Contribuir al SDK
Proceso de Contribución
flowchart TD
Issue["Crear/buscar Issue\nen GitHub"] --> Fork["Fork del repo"]
Fork --> Branch["Crear branch\nfeat/mi-feature o fix/mi-bug"]
Branch --> Code["Implementar cambios"]
Code --> Tests["Agregar/actualizar tests"]
Tests --> Lint["Pasar linters\nruff, mypy, pytest"]
Lint --> PR["Abrir Pull Request"]
PR --> Review["Code Review\npor maintainers"]
Review --> |"Cambios solicitados"| Code
Review --> |"Aprobado"| Merge["Merge a main"]
Setup para Desarrollo Local
# Clonar el repo del SDK Python
git clone https://github.com/anthropics/claude-code-sdk-python
cd claude-code-sdk-python
# Instalar en modo desarrollo
pip install -e ".[dev]"
# Verificar que los tests pasan
pytest tests/ -v
# Correr linters
ruff check .
mypy .
# Para el SDK TypeScript
git clone https://github.com/anthropics/claude-code-sdk-js
cd claude-code-sdk-js
npm install
npm test
npm run typecheck
npm run lint
Testing Local del SDK Modificado
# test_local_sdk.py
"""
Cómo testear cambios al SDK localmente antes de abrir un PR.
"""
import asyncio
import sys
# Agregar el SDK local al path (para probar tu versión modificada)
# sys.path.insert(0, "/ruta/a/tu/fork/claude-code-sdk-python")
from claude_code_sdk import query, ClaudeCodeOptions
async def test_modification():
"""Test básico para verificar que tu modificación funciona."""
results = []
async for message in query(
"Di hola",
options=ClaudeCodeOptions(
allowed_tools=[],
max_turns=1
)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
assert len(results) > 0, "El SDK debería retornar al menos un mensaje"
print("✓ Test pasó")
asyncio.run(test_modification())
Cómo Reportar Bugs
## Título del Issue
Describe el problema en una línea clara
## Comportamiento esperado
Lo que debería pasar
## Comportamiento actual
Lo que está pasando
## Reproducción mínima
```python
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
async def reproduce():
async for msg in query("prompt que causa el bug", options=ClaudeCodeOptions()):
print(msg)
asyncio.run(reproduce())
Versión del SDK
claude-code-sdk: X.Y.Z Python: 3.12.X OS: Ubuntu 22.04
Stack trace
...
---
## 7. Changelog y Migración entre Versiones
### Cambios Breaking entre Versiones
```mermaid
timeline
title Evolución del SDK
0.0.1 : Lanzamiento inicial
: query() básica
: ClaudeCodeOptions
0.0.5 : Soporte MCP
: Herramientas personalizadas
: Mejoras en tipos
0.0.10 : Hooks PreToolUse PostToolUse
: session_id para reanudación
: cost_usd en ResultMessage
0.0.14+ : ClaudeSDKClient
: Mejoras de rendimiento
: Type hints mejorados
Guía de Migración
# migration_guide.py
# ============================================================
# ANTES (versiones antiguas):
# ============================================================
# Importación antigua (si el paquete se llamaba diferente)
# from claude_agent_sdk import query, ClaudeAgentOptions
# ============================================================
# AHORA (versión actual):
# ============================================================
# Importación correcta
from claude_code_sdk import query, ClaudeCodeOptions
# Cambio: ClaudeAgentOptions → ClaudeCodeOptions
# Antes:
# options = ClaudeAgentOptions(allowed_tools=["Read"])
# Ahora:
options = ClaudeCodeOptions(allowed_tools=["Read"])
# Cambio: campo max_turns (verificar nombre exacto en la versión actual)
options_modern = ClaudeCodeOptions(
allowed_tools=["Read", "Bash"],
max_turns=50,
)
# ============================================================
# Manejo de Deprecations
# ============================================================
import warnings
def safe_create_options(**kwargs) -> ClaudeCodeOptions:
"""
Wrapper que maneja campos deprecados gracefully.
"""
# Filtrar campos que ya no existen
known_fields = {"allowed_tools", "max_turns", "system_prompt", "resume", "model"}
unknown = {k: v for k, v in kwargs.items() if k not in known_fields}
if unknown:
warnings.warn(
f"Campos desconocidos en ClaudeCodeOptions serán ignorados: {list(unknown.keys())}",
DeprecationWarning,
stacklevel=2
)
safe_kwargs = {k: v for k, v in kwargs.items() if k in known_fields}
return ClaudeCodeOptions(**safe_kwargs)
8. Interoperabilidad
Comparación Final de Frameworks
flowchart LR
subgraph CCS["Claude Code SDK"]
direction TB
CCS1["✓ Acceso completo\na herramientas de Claude Code"]
CCS2["✓ MCP nativo"]
CCS3["✓ Hooks de herramientas"]
CCS4["✗ Solo Claude"]
CCS5["✗ Sin orquestación\ncompleja incluida"]
end
subgraph LC["LangChain"]
direction TB
LC1["✓ Multi-modelo"]
LC2["✓ Ecosistema enorme"]
LC3["✓ Chains y agents"]
LC4["✗ Overhead de abstracción"]
LC5["✗ No acceso a\nherramientas de Claude Code"]
end
subgraph CA["CrewAI"]
direction TB
CA1["✓ Multi-agente\nout of the box"]
CA2["✓ Roles y tareas\nbien definidas"]
CA3["✗ Solo para Claude/OpenAI"]
CA4["✗ Menos control fino"]
end
Adapter Pattern: Cambiar de Framework
# adapter_pattern.py
"""
Adapter que permite cambiar de Claude Code SDK a otro framework
sin cambiar el código de negocio.
"""
from abc import ABC, abstractmethod
from typing import AsyncGenerator
class AgentAdapter(ABC):
"""Interface común para todos los frameworks de agentes."""
@abstractmethod
async def run(self, prompt: str, tools: list[str]) -> AsyncGenerator[str, None]:
"""Ejecutar el agente y retornar tokens/chunks del resultado."""
pass
@abstractmethod
async def run_complete(self, prompt: str, tools: list[str]) -> str:
"""Ejecutar el agente y retornar el resultado completo."""
pass
class ClaudeCodeAdapter(AgentAdapter):
"""Adapter para el Claude Code SDK."""
async def run(self, prompt: str, tools: list[str]) -> AsyncGenerator[str, None]:
from claude_code_sdk import query, ClaudeCodeOptions
async for message in query(
prompt,
options=ClaudeCodeOptions(allowed_tools=tools)
):
if hasattr(message, 'content') and message.content:
yield str(message.content)
async def run_complete(self, prompt: str, tools: list[str]) -> str:
chunks = []
async for chunk in self.run(prompt, tools):
chunks.append(chunk)
return "\n".join(chunks)
class MockAdapter(AgentAdapter):
"""Adapter mock para tests."""
def __init__(self, responses: list[str]):
self._responses = iter(responses)
async def run(self, prompt: str, tools: list[str]) -> AsyncGenerator[str, None]:
try:
response = next(self._responses)
yield response
except StopIteration:
yield "Mock response"
async def run_complete(self, prompt: str, tools: list[str]) -> str:
chunks = []
async for chunk in self.run(prompt, tools):
chunks.append(chunk)
return "\n".join(chunks)
# Código de negocio que usa el adapter (no sabe qué framework usa)
class CodeAnalysisService:
def __init__(self, agent: AgentAdapter):
self._agent = agent
async def analyze_file(self, file_path: str) -> str:
return await self._agent.run_complete(
prompt=f"Analiza el archivo {file_path} y encuentra problemas",
tools=["Read", "Bash"]
)
# Cambiar entre implementaciones sin tocar el servicio
async def main():
# En producción
service = CodeAnalysisService(agent=ClaudeCodeAdapter())
# En tests
test_service = CodeAnalysisService(
agent=MockAdapter(["El archivo tiene 3 problemas: ..."])
)
result = await test_service.analyze_file("src/auth.py")
print(result)
Usar con LangChain
# langchain_interop.py
"""
Ejemplo de cómo usar Claude Code SDK como un tool de LangChain.
"""
from typing import Optional, Type
# from langchain.tools import BaseTool # Requiere langchain instalado
# from pydantic import BaseModel, Field
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
# class ClaudeCodeInput(BaseModel):
# prompt: str = Field(description="Tarea para el agente Claude Code")
# tools: list[str] = Field(default=["Read", "Bash"])
# class ClaudeCodeTool(BaseTool):
# """LangChain Tool que delega a Claude Code SDK."""
#
# name: str = "claude_code_agent"
# description: str = "Agente que puede leer archivos, ejecutar comandos y editar código"
# args_schema: Type[BaseModel] = ClaudeCodeInput
#
# def _run(self, prompt: str, tools: list[str] = ["Read", "Bash"]) -> str:
# """Versión síncrona (requerida por LangChain)."""
# return asyncio.run(self._arun(prompt, tools))
#
# async def _arun(self, prompt: str, tools: list[str] = ["Read", "Bash"]) -> str:
# """Versión asíncrona."""
# results = []
# async for message in query(
# prompt,
# options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
# ):
# if hasattr(message, 'content') and message.content:
# results.append(str(message.content))
# return "\n".join(results)
# Patrón manual sin LangChain (más simple y directo):
async def claude_as_tool(prompt: str, tools: list[str] | None = None) -> str:
"""Claude Code como herramienta reutilizable."""
if tools is None:
tools = ["Read", "Bash"]
results = []
async for message in query(
prompt,
options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
return "\n".join(results)
9. El Futuro del SDK
Funcionalidades en Roadmap
Basado en la dirección del proyecto y las tendencias del ecosistema:
roadmap
title Roadmap del Claude Code SDK (estimado)
section Corto plazo
Mejor soporte para streaming en tiempo real : done, 2025-Q4
ClaudeSDKClient estable : active, 2026-Q1
Más opciones de configuración de herramientas : active, 2026-Q1
section Mediano plazo
SDK para más lenguajes (Go, Rust, Java) : 2026-Q2
Built-in rate limiting y retry : 2026-Q2
Soporte para modelos Haiku/Sonnet específicos : 2026-Q3
section Largo plazo
Orquestación multi-agente nativa : 2026-Q4
Memoria persistente integrada : 2027-Q1
Evaluación automática de calidad : 2027-Q2
MCP y el Ecosistema Creciente
flowchart TB
subgraph MCPEcosystem["Ecosistema MCP (Model Context Protocol)"]
SDK["Claude Code SDK"] --> |"se conecta a"| MCP["MCP Servers"]
subgraph MCPServers["MCP Servers disponibles"]
direction LR
DB["Bases de datos\n(PostgreSQL, SQLite)"]
Git["Git\n(GitHub, GitLab)"]
Cloud["Cloud\n(AWS, GCP, Azure)"]
Custom["Custom\n(tu empresa)"]
end
MCP --> DB
MCP --> Git
MCP --> Cloud
MCP --> Custom
end
El ecosistema MCP está creciendo rápidamente. En 2026, hay cientos de MCP servers disponibles que extienden las capacidades del SDK sin necesidad de actualizar el SDK mismo. Ver modelcontextprotocol.io para la lista completa.
Tendencias en Agentes de IA
flowchart LR
Hoy["2026: Hoy"] --> |"Evolucionando hacia"| Futuro["2027-2028"]
subgraph Hoy
H1["Agentes que\neusan herramientas"]
H2["Supervisión\nhumana frecuente"]
H3["Tareas de\nhoras/días"]
end
subgraph Futuro
F1["Agentes que\naprender y se adaptan"]
F2["Autonomía\ncreciente"]
F3["Tareas de\nsemanas/meses"]
end
Implicaciones para el SDK:
- Las APIs de agentes se volverán más estables y con más garantías
- Los mecanismos de checkpoint y resume serán más sofisticados
- El debugging de agentes tendrá herramientas dedicadas
- La evaluación automática de outputs será parte del SDK
Resumen del Tutorial Completo
Hemos recorrido los 20 capítulos del tutorial, desde los conceptos fundamentales hasta las implementaciones avanzadas:
flowchart TB
subgraph Fundamentos["Fundamentos (Cap 1-5)"]
C1["1. Introducción"]
C2["2. Instalación"]
C3["3. Query básico"]
C4["4. Herramientas"]
C5["5. MCP"]
end
subgraph Core["Core Features (Cap 6-10)"]
C6["6. Hooks"]
C7["7. Subagentes"]
C8["8. Sesiones"]
C9["9. Casos de uso"]
C10["10. Mejores prácticas"]
end
subgraph Avanzado["Avanzado (Cap 11-15)"]
C11["11-15. Patrones avanzados\ny optimización"]
end
subgraph Produccion["Producción (Cap 16-20)"]
C16["16. Seguridad"]
C17["17. Deployment"]
C18["18. Observabilidad"]
C19["19. Multi-agente avanzado"]
C20["20. SDK Internals"]
end
Fundamentos --> Core
Core --> Avanzado
Avanzado --> Produccion
Los 5 principios que guían todo el tutorial:
- Seguridad primero: Validar inputs, limitar herramientas, no exponer secrets
- Observabilidad completa: Logs, métricas, trazas para cada query
- Resiliencia: Retry, circuit breaker, graceful degradation
- Economía: Monitorear costos, limitar tokens, cachear cuando es posible
- Mantenibilidad: Código limpio, tipos estrictos, tests automatizados
Recursos oficiales:
Volver al índice: Tutorial Completo - Índice
Apéndice: Recetas Rápidas
Esta sección concentra snippets listos para copiar y usar directamente en proyectos reales.
Receta 1: Agente con Timeout Configurable
# recipe_timeout.py
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
async def agent_with_timeout(
prompt: str,
tools: list[str],
timeout_seconds: float = 300.0
) -> str | None:
"""
Ejecuta el agente con un timeout máximo.
Retorna None si el agente no termina a tiempo.
"""
try:
results = []
async with asyncio.timeout(timeout_seconds):
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(
allowed_tools=tools,
max_turns=50
)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
return "\n".join(results)
except asyncio.TimeoutError:
print(f"[Timeout] El agente excedió {timeout_seconds}s")
return None
Receta 2: Agente con Retry Exponencial
# recipe_retry.py
import asyncio
import random
from claude_code_sdk import query, ClaudeCodeOptions
async def agent_with_retry(
prompt: str,
tools: list[str],
max_retries: int = 3,
base_delay: float = 2.0
) -> str:
"""
Ejecuta el agente con retry y backoff exponencial.
Ideal para entornos con rate limiting intermitente.
"""
last_error = None
for attempt in range(max_retries):
try:
results = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
return "\n".join(results)
except Exception as e:
last_error = e
if attempt < max_retries - 1:
# Backoff exponencial con jitter
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"[Retry] Intento {attempt + 1} falló: {e}. Reintentando en {delay:.1f}s")
await asyncio.sleep(delay)
raise RuntimeError(f"Falló después de {max_retries} intentos: {last_error}")
Receta 3: Ejecutar Múltiples Agentes en Paralelo con Semáforo
# recipe_parallel.py
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
async def parallel_agents_with_semaphore(
tasks: list[str],
tools: list[str],
max_concurrent: int = 3
) -> list[str]:
"""
Ejecuta múltiples agentes en paralelo con un límite de concurrencia.
El semáforo evita saturar la API con demasiadas solicitudes simultáneas.
"""
semaphore = asyncio.Semaphore(max_concurrent)
async def run_one(task: str, index: int) -> tuple[int, str]:
async with semaphore:
print(f"[Parallel] Iniciando tarea {index + 1}/{len(tasks)}")
results = []
async for message in query(
prompt=task,
options=ClaudeCodeOptions(allowed_tools=tools, max_turns=20)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
print(f"[Parallel] Tarea {index + 1} completada")
return index, "\n".join(results)
# Crear todas las coroutines
coroutines = [run_one(task, i) for i, task in enumerate(tasks)]
# Ejecutar con gather (respeta el semáforo)
results_with_index = await asyncio.gather(*coroutines)
# Ordenar por índice original y retornar
sorted_results = sorted(results_with_index, key=lambda x: x[0])
return [result for _, result in sorted_results]
Receta 4: Streaming de Resultados a un Cliente HTTP (SSE)
# recipe_sse.py
"""
Transmite el output del agente a un cliente HTTP via Server-Sent Events (SSE).
Permite que el cliente vea el progreso en tiempo real.
"""
import asyncio
import json
from typing import AsyncGenerator
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from claude_code_sdk import query, ClaudeCodeOptions
app = FastAPI()
async def agent_event_stream(prompt: str, tools: list[str]) -> AsyncGenerator[str, None]:
"""Genera eventos SSE del progreso del agente."""
try:
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(allowed_tools=tools, max_turns=50)
):
message_type = type(message).__name__
if message_type == "AssistantMessage":
content = str(getattr(message, 'content', ''))
if content:
event_data = json.dumps({
"type": "content",
"text": content[:500] # Truncar para SSE
})
yield f"data: {event_data}\n\n"
elif message_type == "ToolUseBlock":
tool_name = getattr(message, 'name', 'unknown')
event_data = json.dumps({
"type": "tool_call",
"tool": tool_name
})
yield f"data: {event_data}\n\n"
# Señal de fin
if hasattr(message, 'cost_usd'):
cost = getattr(message, 'cost_usd', 0) or 0
event_data = json.dumps({
"type": "complete",
"cost_usd": float(cost)
})
yield f"data: {event_data}\n\n"
except Exception as e:
error_data = json.dumps({"type": "error", "message": str(e)})
yield f"data: {error_data}\n\n"
@app.get("/agent/stream")
async def stream_agent(prompt: str):
"""Endpoint SSE para streaming del agente."""
return StreamingResponse(
agent_event_stream(prompt, ["Read", "Bash"]),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Nginx: deshabilitar buffering
}
)
Receta 5: Caché de Resultados con TTL
# recipe_cache.py
"""
Caché de resultados del agente para queries idénticas o similares.
Reduce costos evitando ejecutar el mismo análisis dos veces.
"""
import asyncio
import hashlib
import json
import time
from typing import Any
import redis.asyncio as redis
from claude_code_sdk import query, ClaudeCodeOptions
class AgentResultCache:
"""Caché de resultados con TTL en Redis."""
def __init__(self, redis_client: redis.Redis, ttl_seconds: int = 3600):
self.redis = redis_client
self.ttl = ttl_seconds
def _cache_key(self, prompt: str, tools: list[str]) -> str:
"""Genera clave de caché determinística."""
content = json.dumps({"prompt": prompt, "tools": sorted(tools)}, sort_keys=True)
return f"agent_cache:{hashlib.sha256(content.encode()).hexdigest()[:16]}"
async def get(self, prompt: str, tools: list[str]) -> str | None:
"""Obtiene resultado cacheado si existe."""
key = self._cache_key(prompt, tools)
cached = await self.redis.get(key)
if cached:
data = json.loads(cached)
print(f"[Cache] HIT para key {key[:20]}...")
return data["result"]
return None
async def set(self, prompt: str, tools: list[str], result: str) -> None:
"""Guarda resultado en caché."""
key = self._cache_key(prompt, tools)
await self.redis.setex(
key,
self.ttl,
json.dumps({"result": result, "cached_at": time.time()})
)
print(f"[Cache] MISS guardado para key {key[:20]}...")
async def run_with_cache(
self,
prompt: str,
tools: list[str] | None = None
) -> str:
"""Ejecuta el agente con caché automático."""
if tools is None:
tools = ["Read", "Bash"]
# Intentar caché primero
cached = await self.get(prompt, tools)
if cached:
return cached
# Ejecutar agente
results = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
result = "\n".join(results)
# Guardar en caché solo si el resultado no es vacío
if result.strip():
await self.set(prompt, tools, result)
return result
Receta 6: Agente con Circuit Breaker
# recipe_circuit_breaker.py
"""
Circuit breaker para el agente: si hay muchos fallos consecutivos,
abre el circuito temporalmente para evitar saturar la API.
"""
import asyncio
import time
from enum import Enum
from claude_code_sdk import query, ClaudeCodeOptions
class CircuitState(str, Enum):
CLOSED = "closed" # Normal: peticiones pasan
OPEN = "open" # Fallo: peticiones rechazadas
HALF_OPEN = "half_open" # Prueba: una petición de prueba
class AgentCircuitBreaker:
"""Circuit breaker para proteger contra fallos en cascada."""
def __init__(
self,
failure_threshold: int = 5,
reset_timeout: float = 60.0,
success_threshold: int = 2
):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.success_threshold = success_threshold
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time: float | None = None
@property
def state(self) -> CircuitState:
"""Verifica si el circuito debe pasar de OPEN a HALF_OPEN."""
if (
self._state == CircuitState.OPEN and
self._last_failure_time is not None and
time.time() - self._last_failure_time > self.reset_timeout
):
self._state = CircuitState.HALF_OPEN
self._success_count = 0
return self._state
def _on_success(self) -> None:
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.success_threshold:
print("[CircuitBreaker] Circuito cerrado (recuperado)")
self._state = CircuitState.CLOSED
self._failure_count = 0
elif self._state == CircuitState.CLOSED:
self._failure_count = 0
def _on_failure(self) -> None:
self._failure_count += 1
self._last_failure_time = time.time()
if self._failure_count >= self.failure_threshold:
print(f"[CircuitBreaker] Circuito ABIERTO después de {self._failure_count} fallos")
self._state = CircuitState.OPEN
async def call(self, prompt: str, tools: list[str]) -> str:
"""Ejecuta el agente con protección de circuit breaker."""
if self.state == CircuitState.OPEN:
raise RuntimeError("Circuit breaker ABIERTO: servicio no disponible temporalmente")
try:
results = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(allowed_tools=tools, max_turns=30)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
self._on_success()
return "\n".join(results)
except Exception as e:
self._on_failure()
raise
Notas Finales
El Claude Code SDK es una herramienta en evolución activa. Algunas consideraciones finales para trabajar con él a largo plazo:
-
Versionar el SDK: Pin a versiones específicas en producción (
claude-code-sdk==0.0.14). El SDK puede tener cambios breaking entre versiones menores mientras está en la fase0.x. -
Probar contra el CLI real: El SDK depende del CLI
claudeinstalado localmente. En CI/CD, asegúrate de que el CLI esté instalado en el environment de build. Usarnpm install -g @anthropic-ai/claude-codeen el Dockerfile o en los steps de CI. -
Costs en producción: El
cost_usdque reporta el SDK es el costo de los tokens de la API. Los costos reales pueden diferir ligeramente por promociones, descuentos por volumen o actualizaciones de pricing. Siempre confirmar con el dashboard de Anthropic. -
Session IDs son efímeros: Los
session_idson específicos a la instalación del CLI y la instancia de Docker. No son portables entre máquinas ni entre versiones del CLI. Usar solo para reanudar dentro de la misma sesión activa. -
Seguridad ante todo: Nunca hardcodear
ANTHROPIC_API_KEY. Usar variables de entorno o secrets managers. El SDK la lee automáticamente deANTHROPIC_API_KEYen el environment.
Volver al índice: Tutorial Completo - Índice
10. Protocolo JSON Lines en Detalle
Cada Mensaje es una Línea JSON
El protocolo de comunicación entre el SDK y el proceso claude -p es JSON Lines: cada mensaje es un objeto JSON completo en una sola línea, separados por \n. Esto es diferente de JSON streaming (como NDJSON o JSON streaming de LLMs) porque cada línea es un documento JSON completo y auto-contenido.
sequenceDiagram
participant App as Tu Aplicación
participant SDK as SDK query()
participant Proc as claude -p (subprocess)
participant API as Anthropic API
App->>SDK: query(prompt, options)
SDK->>Proc: spawn(["claude", "-p", "--output-format", "stream-json"])
SDK->>Proc: stdin.write(prompt_json + "\n")
SDK->>Proc: stdin.close()
Proc->>API: POST /v1/messages (HTTPS)
API-->>Proc: stream de tokens
Proc-->>SDK: stdout: {"type":"system","subtype":"init",...}\n
SDK-->>App: yield SystemMessage
Proc-->>SDK: stdout: {"type":"assistant","message":{...}}\n
SDK-->>App: yield AssistantMessage
Proc-->>SDK: stdout: {"type":"tool","tool_use_id":"tu_001",...}\n
SDK-->>App: yield ToolResultMessage
Proc-->>SDK: stdout: {"type":"result","subtype":"success",...}\n
SDK-->>App: yield ResultMessage
Proc-->>SDK: EOF en stdout
SDK->>SDK: await process.wait()
SDK-->>App: StopAsyncIteration
Schema Completo de Cada Tipo de Mensaje
Mensaje tipo system — Inicialización
{
"type": "system",
"subtype": "init",
"session_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"tools": [
{
"name": "Read",
"description": "Read a file from the filesystem",
"input_schema": {
"type": "object",
"properties": {
"file_path": { "type": "string", "description": "Absolute path to file" },
"offset": { "type": "integer", "description": "Line number to start from" },
"limit": { "type": "integer", "description": "Number of lines to read" }
},
"required": ["file_path"]
}
},
{
"name": "Bash",
"description": "Execute a bash command",
"input_schema": {
"type": "object",
"properties": {
"command": { "type": "string", "description": "The bash command to execute" },
"timeout": { "type": "integer", "description": "Timeout in milliseconds" }
},
"required": ["command"]
}
}
],
"mcp_servers": [],
"model": "claude-opus-4-5",
"permissionMode": "default",
"apiKeySource": "environment"
}
Mensaje tipo assistant — Respuesta del LLM
{
"type": "assistant",
"message": {
"id": "msg_01XFDUDYJgAACzvnptvVoYEL",
"type": "message",
"role": "assistant",
"content": [
{
"type": "text",
"text": "Voy a analizar el archivo auth.py para encontrar bugs de seguridad."
},
{
"type": "tool_use",
"id": "toolu_01A09q90qw90lq917835lq9",
"name": "Read",
"input": {
"file_path": "/home/user/project/auth.py",
"offset": 1,
"limit": 100
}
}
],
"model": "claude-opus-4-5",
"stop_reason": "tool_use",
"stop_sequence": null,
"usage": {
"input_tokens": 1247,
"output_tokens": 89,
"cache_creation_input_tokens": 1100,
"cache_read_input_tokens": 0
}
}
}
Mensaje tipo user — Resultado de Herramienta
Este mensaje lo genera el CLI cuando ejecuta la herramienta y empaqueta el resultado para devolverlo al LLM:
{
"type": "user",
"message": {
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "toolu_01A09q90qw90lq917835lq9",
"content": " 1→import hashlib\n 2→import secrets\n 3→import bcrypt\n 4→\n 5→def authenticate(username: str, password: str) -> bool:\n 6→ # BUG: comparación directa de strings (timing attack)\n 7→ stored = get_password_hash(username)\n 8→ return stored == password\n",
"is_error": false
}
]
}
}
Mensaje tipo result — Resultado Final
{
"type": "result",
"subtype": "success",
"is_error": false,
"result": "Encontré 2 vulnerabilidades de seguridad en auth.py:\n\n1. **Timing Attack** (línea 8): La comparación `stored == password` es vulnerable...\n\n2. **Hash débil** (línea 15): Se usa MD5 para hashear passwords...",
"session_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"cost_usd": 0.02341,
"duration_ms": 12543,
"num_turns": 4,
"usage": {
"input_tokens": 5823,
"output_tokens": 342,
"cache_creation_input_tokens": 4100,
"cache_read_input_tokens": 1200
}
}
Mensaje tipo result con error
{
"type": "result",
"subtype": "error",
"is_error": true,
"error": "max_turns_exceeded",
"result": "",
"session_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"cost_usd": 0.045,
"duration_ms": 89234,
"num_turns": 50
}
Cómo el SDK Parsea y Valida Cada Mensaje
# sdk_parser_explained.py
"""
Cómo el SDK convierte líneas JSON en objetos Python tipados.
Este código es educativo — el SDK real es más complejo.
"""
import json
from typing import Any
from dataclasses import dataclass
@dataclass
class ParsedMessage:
raw_type: str
data: dict[str, Any]
def parse_sdk_line(line: str) -> ParsedMessage | None:
"""
Parsea una línea JSON del stdout del subproceso.
Retorna None si la línea está vacía o no es JSON válido.
El SDK real puede lanzar excepciones específicas en errores de parsing.
"""
stripped = line.strip()
# Líneas vacías son normales (buffering del subprocess)
if not stripped:
return None
# Intentar parsear como JSON
try:
data = json.loads(stripped)
except json.JSONDecodeError as e:
# El SDK real loggea esto como warning y continúa
# Puede ocurrir si el CLI escribe output no-JSON (ej: mensajes de error)
print(f"[WARN] Línea no-JSON ignorada: {stripped[:100]!r} — {e}")
return None
# Validar que tiene el campo 'type'
msg_type = data.get("type")
if not isinstance(msg_type, str):
print(f"[WARN] Mensaje sin 'type' ignorado: {data}")
return None
return ParsedMessage(raw_type=msg_type, data=data)
def dispatch_message(parsed: ParsedMessage) -> Any:
"""
Convierte el mensaje parseado en el objeto Python correspondiente.
El SDK real usa dataclasses o Pydantic models.
"""
match parsed.raw_type:
case "system":
return handle_system_message(parsed.data)
case "assistant":
return handle_assistant_message(parsed.data)
case "user":
return handle_user_message(parsed.data)
case "result":
return handle_result_message(parsed.data)
case _:
# Tipos desconocidos: el SDK los ignora silenciosamente
# Esto permite compatibilidad futura con nuevos tipos
print(f"[DEBUG] Tipo de mensaje desconocido: {parsed.raw_type}")
return None
def handle_result_message(data: dict) -> dict:
"""Maneja el mensaje de resultado final."""
return {
"type": "result",
"success": data.get("subtype") == "success",
"result": data.get("result", ""),
"session_id": data.get("session_id"),
"cost_usd": data.get("cost_usd"),
"duration_ms": data.get("duration_ms"),
"num_turns": data.get("num_turns"),
"usage": data.get("usage", {}),
"error": data.get("error"),
}
def handle_system_message(data: dict) -> dict:
return {"type": "system", "session_id": data.get("session_id"), "tools": data.get("tools", [])}
def handle_assistant_message(data: dict) -> dict:
return {"type": "assistant", "message": data.get("message", {})}
def handle_user_message(data: dict) -> dict:
return {"type": "user", "message": data.get("message", {})}
Capturar el Protocolo Raw para Debugging
# capture_raw_protocol.py
"""
Captura el protocolo raw JSON Lines para debugging.
Útil cuando el SDK se comporta de forma inesperada.
"""
import asyncio
import json
import sys
from datetime import datetime
async def capture_protocol_session(
prompt: str,
output_file: str = "/tmp/claude_protocol_capture.jsonl"
) -> None:
"""
Spawna claude -p directamente y captura todos los mensajes raw.
Los guarda en un archivo JSONL para análisis posterior.
"""
process = await asyncio.create_subprocess_exec(
"claude", "-p",
"--output-format", "stream-json",
"--verbose",
"--allowedTools", "Read,Bash",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Enviar prompt
process.stdin.write(prompt.encode() + b"\n")
await process.stdin.drain()
process.stdin.close()
messages_captured = []
async def capture_stdout():
async for raw_line in process.stdout:
decoded = raw_line.decode().strip()
if decoded:
entry = {
"timestamp": datetime.utcnow().isoformat(),
"source": "stdout",
"raw": decoded,
}
try:
entry["parsed"] = json.loads(decoded)
entry["type"] = entry["parsed"].get("type", "unknown")
except json.JSONDecodeError:
entry["parse_error"] = True
messages_captured.append(entry)
# Imprimir en tiempo real para debugging
msg_type = entry.get("type", "?")
print(f"[{msg_type:12s}] {decoded[:120]}")
async def capture_stderr():
async for raw_line in process.stderr:
decoded = raw_line.decode().strip()
if decoded:
print(f"[STDERR] {decoded}", file=sys.stderr)
await asyncio.gather(capture_stdout(), capture_stderr())
await process.wait()
# Guardar captura
with open(output_file, "w") as f:
for entry in messages_captured:
f.write(json.dumps(entry) + "\n")
print(f"\n[CAPTURE] {len(messages_captured)} mensajes guardados en {output_file}")
print(f"[CAPTURE] Exit code: {process.returncode}")
# Uso:
# asyncio.run(capture_protocol_session("Lista los archivos Python"))
# Luego analizar: cat /tmp/claude_protocol_capture.jsonl | jq '.type'
TypeScript: Parsear el Protocolo Manualmente
// protocol-parser.ts
import { spawn } from "node:child_process";
import * as readline from "node:readline";
interface SDKProtocolMessage {
type: "system" | "assistant" | "user" | "result" | "unknown";
raw: Record<string, unknown>;
timestamp: Date;
}
async function* captureRawProtocol(
prompt: string
): AsyncGenerator<SDKProtocolMessage> {
const process = spawn("claude", [
"-p",
"--output-format",
"stream-json",
"--allowedTools",
"Read,Bash",
]);
process.stdin.write(prompt + "\n");
process.stdin.end();
const rl = readline.createInterface({ input: process.stdout });
const messageQueue: SDKProtocolMessage[] = [];
let resolve: (() => void) | null = null;
let done = false;
rl.on("line", (line) => {
if (!line.trim()) return;
try {
const raw = JSON.parse(line) as Record<string, unknown>;
const msg: SDKProtocolMessage = {
type: (raw.type as SDKProtocolMessage["type"]) ?? "unknown",
raw,
timestamp: new Date(),
};
messageQueue.push(msg);
resolve?.();
resolve = null;
} catch {
console.error(`[PARSE ERROR] ${line.slice(0, 100)}`);
}
});
rl.on("close", () => {
done = true;
resolve?.();
});
while (!done || messageQueue.length > 0) {
if (messageQueue.length > 0) {
yield messageQueue.shift()!;
} else {
await new Promise<void>((r) => {
resolve = r;
});
}
}
}
// Uso: capturar y analizar el protocolo
async function analyzeProtocol(prompt: string): Promise<void> {
const stats: Record<string, number> = {};
for await (const msg of captureRawProtocol(prompt)) {
stats[msg.type] = (stats[msg.type] ?? 0) + 1;
console.log(`[${msg.timestamp.toISOString()}] type=${msg.type}`);
if (msg.type === "result") {
const result = msg.raw as { cost_usd?: number; num_turns?: number };
console.log(` cost_usd: $${result.cost_usd?.toFixed(4)}`);
console.log(` num_turns: ${result.num_turns}`);
}
}
console.log("\nMessage type distribution:", stats);
}
export { captureRawProtocol, analyzeProtocol, SDKProtocolMessage };
11. Implementar un SDK desde Cero
Esta sección es educativa: implementar query() mínimo desde cero ayuda a entender el SDK real y a debuggear cuando algo falla.
Implementación Mínima Funcional en Python
# minimal_sdk.py
"""
Implementación mínima y funcional de query() desde cero.
Ayuda a entender el SDK real y a debuggear.
NO usar en producción — el SDK oficial tiene:
- Mejor manejo de errores
- Type safety
- Cancelación limpia
- Manejo de MCP
- Reconexión
"""
import asyncio
import json
import os
import shutil
from typing import AsyncGenerator, Any
class MinimalSDKError(Exception):
"""Error del SDK mínimo."""
pass
class MinimalQueryOptions:
"""Opciones simplificadas para el SDK mínimo."""
def __init__(
self,
allowed_tools: list[str] | None = None,
max_turns: int = 50,
system_prompt: str | None = None,
model: str | None = None,
):
self.allowed_tools = allowed_tools or []
self.max_turns = max_turns
self.system_prompt = system_prompt
self.model = model
async def minimal_query(
prompt: str,
options: MinimalQueryOptions | None = None,
) -> AsyncGenerator[dict[str, Any], None]:
"""
Implementación mínima de query().
Demuestra el mecanismo fundamental:
1. Spawnar claude -p con los flags correctos
2. Enviar prompt por stdin
3. Leer JSON Lines de stdout
4. Parsear y hacer yield de cada mensaje
5. Manejar errores del proceso
"""
opts = options or MinimalQueryOptions()
# 1. Construir el comando
cmd = [
"claude",
"-p",
"--output-format", "stream-json",
"--max-turns", str(opts.max_turns),
]
for tool in opts.allowed_tools:
cmd.extend(["--allowedTools", tool])
if opts.system_prompt:
cmd.extend(["--system-prompt", opts.system_prompt])
if opts.model:
cmd.extend(["--model", opts.model])
# 2. Verificar que claude está instalado
if not shutil.which("claude"):
raise MinimalSDKError(
"claude CLI no encontrado. Instalar con: npm install -g @anthropic-ai/claude-code"
)
# 3. Spawnar el subproceso
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env={
**os.environ,
"CLAUDE_CODE_NONINTERACTIVE": "1",
}
)
except FileNotFoundError:
raise MinimalSDKError("claude CLI no encontrado en PATH")
# 4. Enviar el prompt por stdin
try:
process.stdin.write(prompt.encode("utf-8") + b"\n")
await process.stdin.drain()
process.stdin.close()
except BrokenPipeError:
stderr_output = await process.stderr.read()
raise MinimalSDKError(
f"El proceso falló antes de recibir el prompt: {stderr_output.decode()[:500]}"
)
# 5. Leer y parsear JSON Lines del stdout
parse_errors = 0
messages_received = 0
async for raw_line in process.stdout:
line = raw_line.decode("utf-8", errors="replace").strip()
if not line:
continue
try:
message = json.loads(line)
messages_received += 1
if not isinstance(message, dict) or "type" not in message:
continue
yield message
except json.JSONDecodeError:
parse_errors += 1
if parse_errors > 10:
raise MinimalSDKError(
f"Demasiados errores de parsing JSON ({parse_errors}). "
f"Última línea: {line[:200]!r}"
)
continue
# 6. Verificar el código de salida del proceso
return_code = await process.wait()
if return_code != 0:
stderr_output = b""
try:
stderr_output = await asyncio.wait_for(process.stderr.read(), timeout=2.0)
except asyncio.TimeoutError:
pass
raise MinimalSDKError(
f"claude CLI terminó con código {return_code}. "
f"stderr: {stderr_output.decode()[:500]}"
)
if messages_received == 0:
raise MinimalSDKError(
"claude CLI no produjo ningún mensaje. "
"Verificar que ANTHROPIC_API_KEY esté configurado."
)
async def demo_minimal_sdk():
"""Demostración del SDK mínimo."""
print("Ejecutando query con SDK mínimo...")
async for message in minimal_query(
prompt="Lista los archivos Python en el directorio actual",
options=MinimalQueryOptions(
allowed_tools=["Bash"],
max_turns=10,
)
):
msg_type = message.get("type", "?")
if msg_type == "assistant":
content = message.get("message", {}).get("content", [])
for block in content:
if block.get("type") == "text":
print(f"[ASSISTANT] {block['text'][:200]}")
elif block.get("type") == "tool_use":
print(f"[TOOL CALL] {block['name']}: {str(block['input'])[:100]}")
elif msg_type == "result":
cost = message.get("cost_usd", 0)
turns = message.get("num_turns", 0)
print(f"\n[DONE] {turns} turnos, costo: ${cost:.4f}")
Implementación Mínima en TypeScript
// minimal-sdk.ts
import { spawn } from "node:child_process";
import * as readline from "node:readline";
interface MinimalOptions {
allowedTools?: string[];
maxTurns?: number;
systemPrompt?: string;
model?: string;
}
interface SDKMessage {
type: string;
[key: string]: unknown;
}
async function* minimalQuery(
prompt: string,
options: MinimalOptions = {}
): AsyncGenerator<SDKMessage> {
const { allowedTools = [], maxTurns = 50, systemPrompt, model } = options;
const args = ["-p", "--output-format", "stream-json", "--max-turns", String(maxTurns)];
for (const tool of allowedTools) args.push("--allowedTools", tool);
if (systemPrompt) args.push("--system-prompt", systemPrompt);
if (model) args.push("--model", model);
const proc = spawn("claude", args, {
stdio: ["pipe", "pipe", "pipe"],
env: { ...process.env, CLAUDE_CODE_NONINTERACTIVE: "1" },
});
proc.stdin.write(prompt + "\n");
proc.stdin.end();
const rl = readline.createInterface({ input: proc.stdout, crlfDelay: Infinity });
const queue: SDKMessage[] = [];
const waiters: Array<() => void> = [];
let finished = false;
rl.on("line", (line) => {
const trimmed = line.trim();
if (!trimmed) return;
try {
const msg = JSON.parse(trimmed) as SDKMessage;
if (waiters.length > 0) {
queue.push(msg);
waiters.shift()!();
} else {
queue.push(msg);
}
} catch {
// Ignorar líneas no-JSON
}
});
rl.on("close", () => {
finished = true;
waiters.forEach((r) => r());
waiters.length = 0;
});
while (!finished || queue.length > 0) {
if (queue.length > 0) {
yield queue.shift()!;
} else if (!finished) {
await new Promise<void>((resolve) => {
waiters.push(resolve);
});
}
}
}
export { minimalQuery, MinimalOptions, SDKMessage };
Diferencia entre stdin.close() y stdin Persistente
# stdin_modes.py
"""
Explicación de los dos modos de stdin:
1. stdin.close() inmediato: para queries únicas (modo actual del SDK)
2. stdin persistente: para sesiones multi-turn sin reiniciar el proceso (hipotético)
"""
import asyncio
import json
async def single_query_mode(prompt: str):
"""
Modo actual del SDK: stdin se cierra después del primer prompt.
El proceso termina después de completar el query.
"""
process = await asyncio.create_subprocess_exec(
"claude", "-p", "--output-format", "stream-json",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Enviar prompt
process.stdin.write(prompt.encode() + b"\n")
await process.stdin.drain()
process.stdin.close() # EOF: el CLI sabe que no vendrán más prompts
# Leer hasta EOF del stdout
async for line in process.stdout:
decoded = line.decode().strip()
if decoded:
msg = json.loads(decoded)
print(f"[MODE:SINGLE] {msg.get('type')}")
await process.wait()
# Proceso terminó — para el siguiente query, se spawna uno nuevo
# Por qué el SDK no reutiliza el proceso:
# 1. Cada query puede necesitar un working directory diferente
# 2. Los hooks (PreToolUse, PostToolUse) pueden tener estado que no debería persistir
# 3. El estado del CLI (historial de conversación) se resetea limpiamente
# 4. Facilita el manejo de errores: si el proceso muere, solo afecta una query
12. Monkey Patching y Extensión en Runtime
Interceptar query() para Logging Automático
# monkey_patching.py
"""
Monkey patching del SDK sin modificar su código fuente.
Útil para agregar comportamiento cross-cutting (logging, metrics, tracing)
sin cambiar cada llamada a query().
"""
import time
from typing import AsyncGenerator, Any
import claude_code_sdk
from claude_code_sdk import ClaudeCodeOptions
# Guardar referencia al original antes de patchear
_original_query = claude_code_sdk.query
async def _instrumented_query(
prompt: str,
options: ClaudeCodeOptions | None = None,
**kwargs
) -> AsyncGenerator[Any, None]:
"""
Versión instrumentada de query() que agrega logging automático.
"""
start = time.monotonic()
tool_calls = []
print(f"[INTERCEPT] query() iniciado — prompt: {prompt[:80]!r}")
async for message in _original_query(prompt, options, **kwargs):
msg_type = type(message).__name__
if msg_type == "ToolUseBlock":
tool_calls.append(getattr(message, 'name', 'unknown'))
yield message
duration = time.monotonic() - start
print(f"[INTERCEPT] query() completado — {duration:.2f}s, tools: {tool_calls}")
def install_query_interceptor():
"""Instala el interceptor globalmente."""
claude_code_sdk.query = _instrumented_query
print("[INTERCEPT] Interceptor instalado")
def uninstall_query_interceptor():
"""Restaura query() original."""
claude_code_sdk.query = _original_query
print("[INTERCEPT] Interceptor desinstalado")
# Context manager para interceptación temporal
from contextlib import asynccontextmanager
@asynccontextmanager
async def with_query_interceptor(interceptor_fn):
"""
Context manager que instala y desinstala un interceptor.
Uso:
async with with_query_interceptor(my_interceptor):
result = await run_agent(...)
"""
original = claude_code_sdk.query
claude_code_sdk.query = interceptor_fn
try:
yield
finally:
claude_code_sdk.query = original
Decorators para Agregar Comportamiento
# sdk_decorators.py
"""
Decorators para agregar comportamiento a funciones que usan el SDK.
Principio Open/Closed: extender sin modificar.
"""
import asyncio
import functools
from typing import Callable
def with_timeout(timeout_seconds: float):
"""Agrega timeout a cualquier función async que use el SDK."""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
try:
return await asyncio.wait_for(func(*args, **kwargs), timeout=timeout_seconds)
except asyncio.TimeoutError:
raise TimeoutError(f"{func.__name__} excedió {timeout_seconds}s")
return wrapper
return decorator
def with_retry(max_attempts: int = 3, delay_seconds: float = 2.0):
"""Agrega retry exponencial."""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
last_error = None
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
last_error = e
if attempt < max_attempts - 1:
wait = delay_seconds * (2 ** attempt)
print(f"[RETRY] Intento {attempt + 1}: {e}. Esperando {wait:.1f}s")
await asyncio.sleep(wait)
raise RuntimeError(f"Falló tras {max_attempts} intentos: {last_error}")
return wrapper
return decorator
def with_cost_limit(max_cost_usd: float):
"""Para la ejecución si el costo supera el límite."""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
import claude_code_sdk
from claude_code_sdk import query as original_query
total_cost = 0.0
async def cost_monitoring_query(prompt, options=None, **kw):
nonlocal total_cost
async for msg in original_query(prompt, options, **kw):
if hasattr(msg, 'cost_usd') and msg.cost_usd:
total_cost += float(msg.cost_usd)
if total_cost > max_cost_usd:
raise RuntimeError(
f"Costo excedido: ${total_cost:.4f} > ${max_cost_usd}"
)
yield msg
original = claude_code_sdk.query
claude_code_sdk.query = cost_monitoring_query
try:
return await func(*args, **kwargs)
finally:
claude_code_sdk.query = original
return wrapper
return decorator
# Uso combinado de decorators
@with_timeout(300)
@with_retry(max_attempts=3)
@with_cost_limit(max_cost_usd=0.50)
async def production_agent(prompt: str) -> str:
from claude_code_sdk import query, ClaudeCodeOptions
results = []
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(allowed_tools=["Read", "Bash"], max_turns=30)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
return "\n".join(results)
TypeScript: SDK Wrapper con Proxy
// sdk-proxy.ts
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
type QueryFunction = typeof query;
type QueryMiddleware = (
prompt: string,
options: ClaudeCodeOptions | undefined,
next: QueryFunction
) => ReturnType<QueryFunction>;
function createQueryProxy(middlewares: QueryMiddleware[]): QueryFunction {
return async function* proxiedQuery(prompt: string, options?: ClaudeCodeOptions) {
const buildNext = (i: number): QueryFunction => {
if (i < 0) return query;
return (p, o) => middlewares[i](p, o, buildNext(i - 1));
};
yield* buildNext(middlewares.length - 1)(prompt, options);
};
}
const loggingMiddleware: QueryMiddleware = async function* (prompt, options, next) {
const start = Date.now();
console.log(`[LOG] query started: ${prompt.slice(0, 60)}`);
try {
yield* next(prompt, options);
} finally {
console.log(`[LOG] query completed in ${Date.now() - start}ms`);
}
};
const costTrackingMiddleware = (maxCostUsd: number): QueryMiddleware =>
async function* (prompt, options, next) {
let totalCost = 0;
for await (const message of next(prompt, options)) {
if ("cost_usd" in message && typeof message.cost_usd === "number") {
totalCost += message.cost_usd;
if (totalCost > maxCostUsd) {
throw new Error(`Cost limit exceeded: $${totalCost.toFixed(4)}`);
}
}
yield message;
}
};
const instrumentedQuery = createQueryProxy([
loggingMiddleware,
costTrackingMiddleware(1.00),
]);
export { instrumentedQuery, createQueryProxy, QueryMiddleware };
13. Async Internals — Cómo el SDK Maneja Concurrencia
Por Qué asyncio en Lugar de threading
flowchart LR
subgraph AsyncIO["asyncio (usado por el SDK)"]
A1["Single thread\nno race conditions"]
A2["Non-blocking I/O\nevent loop eficiente"]
A3["Cancelación limpia\nCancelledError propagable"]
A4["Composición natural\nasyncio.gather()"]
end
subgraph Threading["threading (no usado)"]
T1["Race conditions\nnecesita locks"]
T2["Blocking I/O\nhilos bloqueados en readline()"]
T3["Cancelación difícil\nno se puede matar un thread"]
T4["Más overhead\ncambio de contexto del OS"]
end
Flujo Interno del event loop al Leer Mensajes
# async_internals_deep.py
"""
Explicación profunda de cómo asyncio maneja el subproceso.
"""
import asyncio
import json
async def understand_async_subprocess():
"""
Demostración de cómo asyncio lee del subproceso sin bloquear.
"""
# asyncio.create_subprocess_exec registra el file descriptor del pipe
# en el event loop. Cuando hay datos disponibles, el OS notifica al
# event loop via epoll/kqueue/IOCP (según el OS).
process = await asyncio.create_subprocess_exec(
"echo", '{"type":"result","subtype":"success","result":"hola"}',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Cuando llamamos `async for line in process.stdout`:
# 1. El event loop pregunta al OS: "¿hay datos en el fd del pipe?"
# 2. Si hay datos: los retorna inmediatamente
# 3. Si no hay datos: suspende esta coroutine y ejecuta otras
# (no bloquea el thread completo)
# 4. Cuando el OS notifica que hay datos, reanuda esta coroutine
async for line in process.stdout:
decoded = line.decode().strip()
if decoded:
msg = json.loads(decoded)
print(f"Recibido: {msg}")
# Mientras procesamos este mensaje, el event loop puede
# estar ejecutando otras coroutines en paralelo
await process.wait()
# ============================================================
# asyncio.Queue para buffering entre lector y consumidor
# ============================================================
async def producer_consumer_pattern(prompt: str):
"""
Separar lectura y procesamiento con asyncio.Queue.
El productor lee líneas del proceso.
El consumidor procesa mensajes.
La queue hace de buffer entre ambos.
"""
import os
queue: asyncio.Queue[dict | None] = asyncio.Queue(maxsize=50)
process = await asyncio.create_subprocess_exec(
"claude", "-p", "--output-format", "stream-json",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env={**os.environ}
)
process.stdin.write(prompt.encode() + b"\n")
await process.stdin.drain()
process.stdin.close()
# PRODUCTOR: lee del proceso y pone en la queue
async def producer():
async for raw_line in process.stdout:
line = raw_line.decode().strip()
if not line:
continue
try:
msg = json.loads(line)
await queue.put(msg)
# Si la queue está llena (maxsize=50), esta línea
# bloquea al productor — esto es backpressure natural.
# El proceso padre se ralentiza, dando tiempo al consumidor.
except json.JSONDecodeError:
pass
await queue.put(None) # Señal de fin
# CONSUMIDOR: procesa mensajes de la queue
async def consumer():
results = []
while True:
msg = await queue.get()
if msg is None:
break
# Procesamiento potencialmente lento sin bloquear al productor
await asyncio.sleep(0) # yield al event loop
if msg.get("type") == "result":
results.append(msg.get("result", ""))
queue.task_done()
return results
# Ejecutar productor y consumidor concurrentemente
producer_task = asyncio.create_task(producer())
results = await consumer()
await producer_task
return results
# ============================================================
# Cancelación limpia
# ============================================================
async def cancellable_agent(prompt: str) -> str | None:
"""
Agente con soporte de cancelación vía asyncio.CancelledError.
"""
import os
process = None
try:
process = await asyncio.create_subprocess_exec(
"claude", "-p", "--output-format", "stream-json",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env={**os.environ}
)
process.stdin.write(prompt.encode() + b"\n")
await process.stdin.drain()
process.stdin.close()
results = []
async for raw_line in process.stdout:
line = raw_line.decode().strip()
if line:
msg = json.loads(line)
if msg.get("type") == "result":
results.append(msg.get("result", ""))
await process.wait()
return "\n".join(results)
except asyncio.CancelledError:
# Cleanup limpio cuando se cancela la tarea
if process and process.returncode is None:
print("[CANCEL] Terminando proceso claude...")
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=5.0)
except asyncio.TimeoutError:
process.kill()
await process.wait()
raise # Re-lanzar CancelledError (importante!)
Concurrencia Real: Múltiples Queries en Paralelo
# real_concurrency.py
"""
Demostración de que múltiples query() en paralelo son realmente
concurrentes — cada uno en su propio subproceso.
"""
import asyncio
import time
from claude_code_sdk import query, ClaudeCodeOptions
async def benchmark_concurrent_vs_sequential(tasks: list[str]) -> dict:
"""
Compara ejecutar N queries secuencialmente vs concurrentemente.
Demuestra que el SDK soporta concurrencia real.
"""
# Ejecutar secuencialmente
start_seq = time.monotonic()
seq_results = []
for task in tasks:
results = []
async for message in query(
prompt=task,
options=ClaudeCodeOptions(allowed_tools=[], max_turns=2)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
seq_results.append("\n".join(results))
seq_duration = time.monotonic() - start_seq
# Ejecutar concurrentemente
start_con = time.monotonic()
async def run_one(task: str) -> str:
results = []
async for message in query(
prompt=task,
options=ClaudeCodeOptions(allowed_tools=[], max_turns=2)
):
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
return "\n".join(results)
con_results = await asyncio.gather(*[run_one(t) for t in tasks])
con_duration = time.monotonic() - start_con
speedup = seq_duration / con_duration if con_duration > 0 else 0
return {
"sequential_seconds": round(seq_duration, 2),
"concurrent_seconds": round(con_duration, 2),
"speedup": round(speedup, 2),
"tasks_count": len(tasks),
}
TypeScript: Concurrencia con Promise.all
// ts-concurrency.ts
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
async function runAllConcurrently(
tasks: string[],
maxConcurrent: number = 5
): Promise<string[]> {
const semaphore = { count: maxConcurrent };
const waitQueue: Array<() => void> = [];
function acquire(): Promise<void> {
if (semaphore.count > 0) {
semaphore.count--;
return Promise.resolve();
}
return new Promise((resolve) => waitQueue.push(resolve));
}
function release(): void {
const next = waitQueue.shift();
if (next) {
next();
} else {
semaphore.count++;
}
}
const runTask = async (task: string, idx: number): Promise<[number, string]> => {
await acquire();
try {
const results: string[] = [];
for await (const message of query(task, {
allowedTools: [],
maxTurns: 2,
} as ClaudeCodeOptions)) {
if ("content" in message) {
const content = String((message as { content: unknown }).content);
if (content) results.push(content);
}
}
return [idx, results.join("\n")];
} finally {
release();
}
};
const results = await Promise.all(tasks.map((t, i) => runTask(t, i)));
return results.sort(([a], [b]) => a - b).map(([, r]) => r);
}
export { runAllConcurrently };
14. Debugging Avanzado del SDK
Variables de Entorno para Debug
# Debug completo del SDK y CLI
export CLAUDE_CODE_DEBUG=1
export ANTHROPIC_LOG_LEVEL=debug
export OTEL_LOG_LEVEL=debug
export PYTHONASYNCIODEBUG=1 # Python: detectar coroutines bloqueadas
# Ver subprocesos que lanza el SDK (Node.js)
export NODE_DEBUG=child_process
# Strace en Linux para ver syscalls del proceso claude
strace -f -e trace=read,write,open,close -p $(pgrep -f "claude -p") 2>&1 | head -200
Capturar stdin/stdout del Subproceso
# subprocess_inspector.py
"""
Inspección completa del subproceso: qué entra y qué sale.
"""
import asyncio
import json
import time
from pathlib import Path
class SubprocessInspector:
"""
Wrapper transparente que registra todo lo que entra y sale del proceso claude.
"""
def __init__(self, capture_dir: str = "/tmp/claude_inspector"):
self.capture_dir = Path(capture_dir)
self.capture_dir.mkdir(exist_ok=True)
self.session_id = str(int(time.time()))
async def run(self, prompt: str, cli_args: list[str] | None = None) -> None:
"""Ejecuta claude con inspección completa."""
args = cli_args or ["-p", "--output-format", "stream-json"]
process = await asyncio.create_subprocess_exec(
"claude", *args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdin_log = self.capture_dir / f"{self.session_id}_stdin.txt"
stdin_log.write_text(prompt + "\n")
print(f"[INSPECTOR] stdin → {stdin_log}")
process.stdin.write(prompt.encode() + b"\n")
await process.stdin.drain()
process.stdin.close()
stdout_lines = []
stderr_lines = []
async def capture_stdout():
async for line in process.stdout:
decoded = line.decode()
stdout_lines.append(decoded)
stripped = decoded.strip()
if stripped:
try:
msg = json.loads(stripped)
print(f"[STDOUT] type={msg.get('type', '?')}")
except Exception:
print(f"[STDOUT-RAW] {stripped[:80]}")
async def capture_stderr():
async for line in process.stderr:
decoded = line.decode()
stderr_lines.append(decoded)
if decoded.strip():
print(f"[STDERR] {decoded.strip()[:120]}")
await asyncio.gather(capture_stdout(), capture_stderr())
returncode = await process.wait()
stdout_log = self.capture_dir / f"{self.session_id}_stdout.jsonl"
stdout_log.write_text("".join(stdout_lines))
stderr_log = self.capture_dir / f"{self.session_id}_stderr.txt"
stderr_log.write_text("".join(stderr_lines))
print(f"\n[INSPECTOR] Exit code: {returncode}")
print(f" stdout → {stdout_log}")
print(f" stderr → {stderr_log}")
# Análisis rápido
type_counts: dict[str, int] = {}
total_cost = 0.0
for line in stdout_lines:
try:
msg = json.loads(line.strip())
t = msg.get("type", "unknown")
type_counts[t] = type_counts.get(t, 0) + 1
if t == "result":
total_cost += float(msg.get("cost_usd") or 0)
except Exception:
pass
print(f" Tipos de mensajes: {type_counts}")
print(f" Costo total: ${total_cost:.4f}")
Técnicas para Debuggear Agentes Colgados
# debug_hanging.py
"""
Técnicas para debuggear agentes que parecen haberse colgado.
"""
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
async def agent_with_heartbeat(
prompt: str,
heartbeat_interval: float = 30.0
) -> str:
"""
Ejecuta el agente con heartbeat que muestra que está vivo.
Si no hay mensajes en heartbeat_interval segundos,
puede indicar que el agente está esperando al LLM o atascado.
"""
results = []
last_message_time = asyncio.get_event_loop().time()
async def heartbeat():
while True:
await asyncio.sleep(heartbeat_interval)
elapsed = asyncio.get_event_loop().time() - last_message_time
print(
f"[HEARTBEAT] Sin mensajes por {elapsed:.0f}s. "
f"¿Esperando LLM? ¿Herramienta bloqueada?"
)
heartbeat_task = asyncio.create_task(heartbeat())
try:
async for message in query(
prompt=prompt,
options=ClaudeCodeOptions(allowed_tools=["Read", "Bash"], max_turns=50)
):
last_message_time = asyncio.get_event_loop().time()
msg_type = type(message).__name__
print(f"[ALIVE] {msg_type}")
if hasattr(message, 'content') and message.content:
results.append(str(message.content))
finally:
heartbeat_task.cancel()
return "\n".join(results)
async def dump_process_stack_if_hung(prompt: str, timeout_seconds: float = 60.0) -> None:
"""
En Linux: si el agente no responde en timeout_seconds,
captura el stack trace del proceso usando py-spy.
"""
import subprocess
import os
process = await asyncio.create_subprocess_exec(
"claude", "-p", "--output-format", "stream-json",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env={**os.environ}
)
process.stdin.write(prompt.encode() + b"\n")
await process.stdin.drain()
process.stdin.close()
try:
async with asyncio.timeout(timeout_seconds):
async for raw_line in process.stdout:
if raw_line.strip():
print(f"[OK] {raw_line.decode().strip()[:100]}")
except asyncio.TimeoutError:
pid = process.pid
print(f"[HUNG] Proceso {pid} colgado — capturando diagnóstico")
# py-spy: stack trace del proceso Python
try:
result = subprocess.run(
["py-spy", "dump", "--pid", str(pid)],
capture_output=True, text=True, timeout=10
)
print("[PY-SPY STACK]")
print(result.stdout or result.stderr)
except (FileNotFoundError, subprocess.TimeoutExpired):
print("[WARN] py-spy no disponible")
process.kill()
await process.wait()
def reproduce_session_from_log(log_file: str) -> None:
"""
Carga una sesión de producción desde los logs y la reproduce localmente.
Permite debuggear el problema exacto que ocurrió en producción.
"""
import json
from pathlib import Path
log_path = Path(log_file)
if not log_path.exists():
print(f"[ERROR] Log file no encontrado: {log_file}")
return
sessions: dict[str, list[dict]] = {}
with open(log_path) as f:
for line in f:
try:
entry = json.loads(line.strip())
sid = entry.get("session_id", "unknown")
sessions.setdefault(sid, []).append(entry)
except json.JSONDecodeError:
pass
print(f"[REPRODUCE] Encontradas {len(sessions)} sesiones en {log_file}")
for session_id, events in list(sessions.items())[:5]:
print(f"\n Sesión: {session_id}")
for event in events:
print(f" [{event.get('event', '?')}] tool={event.get('tool', '-')} cost=${event.get('cost_usd', 0):.4f}")