Capítulo 12: Testing de Agentes
Capítulo 12: Testing de Agentes
Testear agentes de IA es radicalmente diferente a testear software tradicional. El agente es no-determinista, costoso de ejecutar, y su “corrección” a veces es subjetiva. Este capítulo presenta una estrategia pragmática para mantener la calidad sin arruinarte en costos de API.
1. ¿Por qué es Difícil Testear Agentes?
No-determinismo
Un agente puede resolver la misma tarea por caminos diferentes en cada ejecución. Puede decidir leer primero el README o primero el código fuente. Puede refactorizar en un solo paso o en varios. Esta variabilidad hace imposible los assertions exactos sobre el proceso.
Lo que sí podemos verificar es el resultado: el código resultante debe compilar, los tests deben pasar, el archivo debe tener el contenido correcto.
Costo real de testing
Cada ejecución del agente con la API real cuesta tokens. Un ciclo completo de CI/CD con 50 tests e2e puede costar fácilmente $10-50 por ejecución. Necesitas una estrategia que minimice llamadas reales a la API.
Herramientas reales vs mockeadas
Las herramientas del agente (Bash, Read, Write) tienen efectos secundarios en el sistema de archivos. Un test que ejecuta rm -rf inadvertidamente es catastrófico. La solución es el aislamiento estricto del entorno de testing.
Pirámide de testing para agentes
graph TD
A["🔺 E2E Tests<br/>10%<br/>API real + filesystem real<br/>Pocos, lentos, costosos"] --> B
B["🔶 Integration Tests<br/>30%<br/>SDK real + filesystem aislado<br/>Moderados, directorio temp"] --> C
C["🟩 Unit Tests<br/>60%<br/>Todo mockeado<br/>Muchos, rápidos, gratuitos"]
style A fill:#ff6b6b,color:#fff
style B fill:#ffa94d,color:#fff
style C fill:#51cf66,color:#fff
La pirámide dicta la proporción: la mayoría de cobertura viene de unit tests gratuitos. Los e2e son escasos y selectivos.
2. Mocking del SDK
Mock de query() para tests unitarios
El objetivo es reemplazar la función query() con una versión que retorna mensajes predefinidos, sin llamar a ninguna API:
# tests/fixtures/sdk_mocks.py
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
from typing import AsyncGenerator, Any
from claude_code_sdk import AssistantMessage, ResultMessage
def make_text_block(text: str) -> MagicMock:
"""Crea un TextBlock mock."""
block = MagicMock()
block.text = text
block.type = "text"
del block.name # Asegurar que no tenga atributo 'name'
return block
def make_tool_block(name: str, input_data: dict = None) -> MagicMock:
"""Crea un ToolUseBlock mock."""
block = MagicMock()
block.name = name
block.type = "tool_use"
block.input = input_data or {}
block.id = f"tool_{name}"
return block
def make_assistant_message(*content_blocks) -> AssistantMessage:
"""Crea un AssistantMessage mock con los blocks dados."""
msg = MagicMock(spec=AssistantMessage)
msg.content = list(content_blocks)
return msg
def make_result_message(
subtype: str = "success",
duration_ms: int = 1000,
cost_usd: float = 0.001
) -> ResultMessage:
"""Crea un ResultMessage mock."""
msg = MagicMock(spec=ResultMessage)
msg.subtype = subtype
msg.duration_ms = duration_ms
msg.cost_usd = cost_usd
return msg
def make_query_response(*messages):
"""Crea un async generator que emite los mensajes dados."""
async def _generator(*args, **kwargs) -> AsyncGenerator[Any, None]:
for message in messages:
yield message
return _generator
@pytest.fixture
def mock_simple_response():
"""Mock que retorna una respuesta simple de texto."""
messages = [
make_assistant_message(
make_text_block("Analizando el código..."),
make_tool_block("Read", {"file_path": "main.py"}),
make_text_block("El código tiene 3 funciones. He analizado todo correctamente.")
),
make_result_message()
]
return make_query_response(*messages)
@pytest.fixture
def mock_error_response():
"""Mock que simula un error del agente."""
messages = [
make_assistant_message(
make_text_block("Intentando ejecutar el script..."),
make_tool_block("Bash", {"command": "python script.py"}),
make_text_block("El script falló con error de importación.")
),
make_result_message(subtype="error")
]
return make_query_response(*messages)
Usando los mocks en tests
# tests/test_agent.py
import pytest
from unittest.mock import patch
from my_agent import run_code_analyzer
from tests.fixtures.sdk_mocks import (
make_assistant_message, make_text_block, make_tool_block,
make_result_message, make_query_response
)
@pytest.mark.asyncio
async def test_code_analyzer_extracts_text(mock_simple_response):
"""Verifica que el agente extrae texto correctamente."""
with patch("my_agent.query", mock_simple_response):
result = await run_code_analyzer("/fake/cwd", "Analiza main.py")
assert "3 funciones" in result.output
assert result.success is True
@pytest.mark.asyncio
async def test_code_analyzer_detects_tools_used(mock_simple_response):
"""Verifica que registramos las herramientas usadas."""
with patch("my_agent.query", mock_simple_response):
result = await run_code_analyzer("/fake/cwd", "Analiza main.py")
assert "Read" in result.tools_used
@pytest.mark.asyncio
async def test_code_analyzer_handles_error_response(mock_error_response):
"""Verifica manejo de respuesta con error."""
with patch("my_agent.query", mock_error_response):
result = await run_code_analyzer("/fake/cwd", "Ejecuta script.py")
assert "falló" in result.output.lower()
@pytest.mark.asyncio
async def test_agent_with_empty_response():
"""Verifica que el agente maneja respuesta vacía."""
empty_response = make_query_response(make_result_message())
with patch("my_agent.query", empty_response):
result = await run_code_analyzer("/fake/cwd", "Tarea vacía")
assert result is not None
assert result.output == "" or result.output is None
TypeScript: vitest + vi.mock
// tests/fixtures/sdk-mocks.ts
import { vi } from "vitest";
import type {
AssistantMessage,
ResultMessage,
SDKMessage,
} from "@anthropic-ai/claude-code-sdk";
export function makeTextBlock(text: string) {
return { type: "text" as const, text };
}
export function makeToolUseBlock(name: string, input: Record<string, unknown> = {}) {
return {
type: "tool_use" as const,
id: `tool_${name}_${Date.now()}`,
name,
input,
};
}
export function makeAssistantMessage(
...content: ReturnType<typeof makeTextBlock | typeof makeToolUseBlock>[]
): AssistantMessage {
return {
type: "assistant",
message: {
id: "msg_test",
type: "message",
role: "assistant",
content,
model: "claude-opus-4-5",
stop_reason: "end_turn",
stop_sequence: null,
usage: { input_tokens: 100, output_tokens: 50 },
},
} as AssistantMessage;
}
export function makeResultMessage(overrides: Partial<ResultMessage> = {}): ResultMessage {
return {
type: "result",
subtype: "success",
duration_ms: 1000,
duration_api_ms: 800,
is_error: false,
num_turns: 1,
result: "Task completed",
session_id: "test-session",
total_cost_usd: 0.001,
usage: { input_tokens: 100, output_tokens: 50 },
...overrides,
} as ResultMessage;
}
export function makeQueryMock(messages: SDKMessage[]) {
return async function* () {
for (const message of messages) {
yield message;
}
};
}
// tests/agent.test.ts
import { describe, it, expect, vi, beforeEach } from "vitest";
import { runCodeAnalyzer } from "../src/agent";
import {
makeAssistantMessage,
makeTextBlock,
makeToolUseBlock,
makeResultMessage,
makeQueryMock,
} from "./fixtures/sdk-mocks";
vi.mock("@anthropic-ai/claude-code-sdk", () => ({
query: vi.fn(),
ClaudeCodeOptions: vi.fn(),
}));
import { query } from "@anthropic-ai/claude-code-sdk";
describe("CodeAnalyzer Agent", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("debe extraer texto de la respuesta", async () => {
const mockMessages = [
makeAssistantMessage(
makeTextBlock("Encontré 3 funciones en el código."),
makeToolUseBlock("Read", { file_path: "main.py" }),
makeTextBlock(" El código está bien estructurado.")
),
makeResultMessage(),
];
vi.mocked(query).mockImplementation(makeQueryMock(mockMessages) as any);
const result = await runCodeAnalyzer("/fake/cwd", "Analiza main.py");
expect(result.output).toContain("3 funciones");
expect(result.success).toBe(true);
});
it("debe registrar herramientas usadas", async () => {
const mockMessages = [
makeAssistantMessage(
makeToolUseBlock("Read", { file_path: "main.py" }),
makeToolUseBlock("Bash", { command: "python -m pytest" }),
makeTextBlock("Tests pasaron.")
),
makeResultMessage(),
];
vi.mocked(query).mockImplementation(makeQueryMock(mockMessages) as any);
const result = await runCodeAnalyzer("/fake/cwd", "Corre los tests");
expect(result.toolsUsed).toContain("Read");
expect(result.toolsUsed).toContain("Bash");
});
it("debe manejar respuesta con error", async () => {
const mockMessages = [
makeAssistantMessage(
makeTextBlock("Hubo un error al procesar el archivo.")
),
makeResultMessage({ subtype: "error", is_error: true }),
];
vi.mocked(query).mockImplementation(makeQueryMock(mockMessages) as any);
const result = await runCodeAnalyzer("/fake/cwd", "Procesa archivo.txt");
expect(result.success).toBe(false);
});
});
3. Tests Unitarios de Hooks
Testear PreToolUse hook aislado
Los hooks son funciones puras (o casi puras) que se pueden testear sin el agente completo:
# src/hooks/security_hook.py
from claude_code_sdk import ClaudeCodeOptions
from typing import Optional
def create_security_hook(allowed_dirs: list[str]):
"""Hook que deniega acceso fuera de directorios permitidos."""
def pre_tool_use(tool_name: str, tool_input: dict) -> Optional[dict]:
"""
Retorna None para continuar, o dict con 'deny' para bloquear.
"""
if tool_name in ("Write", "Edit", "MultiEdit"):
path = tool_input.get("file_path", "")
if not any(path.startswith(allowed) for allowed in allowed_dirs):
return {
"deny": True,
"reason": f"Acceso denegado a {path}"
}
return None
return pre_tool_use
# tests/test_security_hook.py
import pytest
from src.hooks.security_hook import create_security_hook
class TestSecurityHook:
@pytest.fixture
def hook(self):
return create_security_hook(allowed_dirs=["/proyecto", "/tmp"])
def test_permite_escritura_en_directorio_permitido(self, hook):
result = hook("Write", {"file_path": "/proyecto/main.py"})
assert result is None
def test_deniega_escritura_fuera_de_directorio(self, hook):
result = hook("Write", {"file_path": "/etc/passwd"})
assert result is not None
assert result["deny"] is True
assert "denegado" in result["reason"].lower()
def test_permite_lectura_en_cualquier_lugar(self, hook):
# Read no está restringido por este hook
result = hook("Read", {"file_path": "/etc/hosts"})
assert result is None
def test_deniega_edit_fuera_de_directorio(self, hook):
result = hook("Edit", {"file_path": "/home/otro/archivo.py"})
assert result is not None
assert result["deny"] is True
def test_permite_escritura_en_subdirectorio(self, hook):
result = hook("Write", {"file_path": "/proyecto/src/components/Button.tsx"})
assert result is None
def test_maneja_path_vacio(self, hook):
result = hook("Write", {"file_path": ""})
assert result is not None
assert result["deny"] is True
def test_bash_no_restringido(self, hook):
result = hook("Bash", {"command": "rm -rf /tmp/test"})
assert result is None
Testear PostToolUse hook aislado
# src/hooks/logging_hook.py
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class ToolEvent:
tool_name: str
input: dict
output: str
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
success: bool = True
class ToolLogger:
def __init__(self):
self.events: list[ToolEvent] = []
def post_tool_use(self, tool_name: str, tool_input: dict, tool_output: str) -> None:
success = "error" not in tool_output.lower() and "failed" not in tool_output.lower()
event = ToolEvent(
tool_name=tool_name,
input=tool_input,
output=tool_output,
success=success
)
self.events.append(event)
def get_failed_tools(self) -> list[ToolEvent]:
return [e for e in self.events if not e.success]
def get_tool_stats(self) -> dict:
stats = {}
for event in self.events:
stats[event.tool_name] = stats.get(event.tool_name, 0) + 1
return stats
# tests/test_logging_hook.py
import pytest
from src.hooks.logging_hook import ToolLogger
class TestToolLogger:
@pytest.fixture
def logger(self):
return ToolLogger()
def test_registra_evento_exitoso(self, logger):
logger.post_tool_use("Read", {"file_path": "main.py"}, "contenido del archivo")
assert len(logger.events) == 1
assert logger.events[0].tool_name == "Read"
assert logger.events[0].success is True
def test_detecta_error_en_output(self, logger):
logger.post_tool_use(
"Bash",
{"command": "python script.py"},
"ModuleNotFoundError: No module named 'requests'"
)
assert logger.events[0].success is False
def test_get_failed_tools_retorna_solo_fallos(self, logger):
logger.post_tool_use("Read", {}, "ok content")
logger.post_tool_use("Bash", {}, "error: command not found")
logger.post_tool_use("Write", {}, "archivo escrito")
failed = logger.get_failed_tools()
assert len(failed) == 1
assert failed[0].tool_name == "Bash"
def test_estadisticas_de_herramientas(self, logger):
logger.post_tool_use("Read", {}, "ok")
logger.post_tool_use("Read", {}, "ok")
logger.post_tool_use("Bash", {}, "ok")
stats = logger.get_tool_stats()
assert stats["Read"] == 2
assert stats["Bash"] == 1
def test_maneja_output_vacio(self, logger):
logger.post_tool_use("Read", {}, "")
assert len(logger.events) == 1
assert logger.events[0].success is True
4. Tests Unitarios de Herramientas MCP
Testear herramientas custom aisladas
# src/tools/database_tool.py
from dataclasses import dataclass
from typing import Any
@dataclass
class DatabaseTool:
"""Herramienta MCP que consulta una base de datos."""
db_url: str
_connection = None
async def connect(self):
# Conexión real
import asyncpg
self._connection = await asyncpg.connect(self.db_url)
async def query_table(self, table: str, limit: int = 10) -> list[dict]:
if not self._connection:
raise RuntimeError("No hay conexión a la base de datos")
rows = await self._connection.fetch(
f"SELECT * FROM {table} LIMIT $1", limit
)
return [dict(row) for row in rows]
async def count_rows(self, table: str) -> int:
if not self._connection:
raise RuntimeError("No hay conexión a la base de datos")
row = await self._connection.fetchrow(f"SELECT COUNT(*) FROM {table}")
return row["count"]
# tests/test_database_tool.py
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from src.tools.database_tool import DatabaseTool
class TestDatabaseTool:
@pytest.fixture
def mock_connection(self):
conn = AsyncMock()
return conn
@pytest.fixture
def tool(self, mock_connection):
t = DatabaseTool(db_url="postgresql://test")
t._connection = mock_connection
return t
@pytest.mark.asyncio
async def test_query_table_retorna_filas(self, tool, mock_connection):
mock_connection.fetch.return_value = [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
]
result = await tool.query_table("users", limit=10)
assert len(result) == 2
assert result[0]["name"] == "Alice"
mock_connection.fetch.assert_called_once_with(
"SELECT * FROM users LIMIT $1", 10
)
@pytest.mark.asyncio
async def test_query_table_respeta_limite(self, tool, mock_connection):
mock_connection.fetch.return_value = []
await tool.query_table("orders", limit=5)
mock_connection.fetch.assert_called_with("SELECT * FROM orders LIMIT $1", 5)
@pytest.mark.asyncio
async def test_falla_sin_conexion(self):
tool = DatabaseTool(db_url="postgresql://test")
# Sin conexión activa
with pytest.raises(RuntimeError, match="No hay conexión"):
await tool.query_table("users")
@pytest.mark.asyncio
async def test_count_rows(self, tool, mock_connection):
mock_connection.fetchrow.return_value = {"count": 42}
count = await tool.count_rows("products")
assert count == 42
@pytest.mark.asyncio
async def test_maneja_tabla_vacia(self, tool, mock_connection):
mock_connection.fetch.return_value = []
result = await tool.query_table("empty_table")
assert result == []
@pytest.mark.asyncio
async def test_maneja_error_de_red(self, tool, mock_connection):
mock_connection.fetch.side_effect = ConnectionError("Network unreachable")
with pytest.raises(ConnectionError):
await tool.query_table("users")
TypeScript: testear herramientas MCP
// src/tools/file-analyzer.ts
export interface FileAnalysis {
lineCount: number;
hasTests: boolean;
imports: string[];
functions: string[];
}
export async function analyzeFile(content: string): Promise<FileAnalysis> {
const lines = content.split("\n");
const imports = lines
.filter((l) => l.startsWith("import"))
.map((l) => l.trim());
const functions = lines
.filter((l) => l.includes("function ") || l.includes("const ") && l.includes("=>"))
.map((l) => l.trim());
return {
lineCount: lines.length,
hasTests: content.includes("describe(") || content.includes("it(") || content.includes("test("),
imports,
functions,
};
}
// tests/tools/file-analyzer.test.ts
import { describe, it, expect } from "vitest";
import { analyzeFile } from "../../src/tools/file-analyzer";
describe("analyzeFile", () => {
it("cuenta líneas correctamente", async () => {
const content = "línea 1\nlínea 2\nlínea 3";
const result = await analyzeFile(content);
expect(result.lineCount).toBe(3);
});
it("detecta imports", async () => {
const content = `import React from 'react';\nimport { useState } from 'react';\nconst x = 1;`;
const result = await analyzeFile(content);
expect(result.imports).toHaveLength(2);
expect(result.imports[0]).toContain("import React");
});
it("detecta presencia de tests", async () => {
const content = `describe('MyComponent', () => { it('works', () => {}); });`;
const result = await analyzeFile(content);
expect(result.hasTests).toBe(true);
});
it("archivo sin tests no tiene hasTests", async () => {
const content = `const x = 1;\nfunction greet() { return 'hello'; }`;
const result = await analyzeFile(content);
expect(result.hasTests).toBe(false);
});
it("maneja archivo vacío", async () => {
const result = await analyzeFile("");
expect(result.lineCount).toBe(1); // Split de string vacío da [""]
expect(result.imports).toHaveLength(0);
expect(result.functions).toHaveLength(0);
});
});
5. Tests de Integración
Setup/teardown de fixtures de filesystem
Los tests de integración usan el SDK real pero en un directorio temporal aislado:
# tests/conftest.py
import pytest
import tempfile
import shutil
from pathlib import Path
@pytest.fixture
def temp_project():
"""Crea un proyecto temporal para tests de integración."""
tmpdir = tempfile.mkdtemp(prefix="agent_test_")
project = Path(tmpdir)
# Crear estructura básica del proyecto de prueba
(project / "src").mkdir()
(project / "tests").mkdir()
(project / "README.md").write_text("# Proyecto de Test\n")
(project / "src" / "main.py").write_text("""
def add(a, b):
return a + b
def subtract(a, b):
return a - b
def multiply(a, b):
return a * b
""")
(project / "tests" / "test_main.py").write_text("""
from src.main import add, subtract
def test_add():
assert add(2, 3) == 5
def test_subtract():
assert subtract(5, 3) == 2
""")
yield project
# Cleanup
shutil.rmtree(tmpdir)
@pytest.fixture
def broken_project(temp_project: Path):
"""Proyecto con código roto para testear el agente reparador."""
(temp_project / "src" / "broken.py").write_text("""
def broken_function(:
return "syntax error
""")
yield temp_project
@pytest.fixture
def python_package(temp_project: Path):
"""Proyecto Python con estructura de paquete."""
(temp_project / "src" / "__init__.py").write_text("")
(temp_project / "src" / "utils.py").write_text("""
def format_name(first: str, last: str) -> str:
return f"{first} {last}"
""")
(temp_project / "requirements.txt").write_text("pytest\n")
yield temp_project
# tests/integration/test_agent_integration.py
import pytest
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions
@pytest.mark.integration
@pytest.mark.asyncio
async def test_agente_lee_archivos_del_proyecto(temp_project: Path):
"""El agente puede leer y analizar archivos reales."""
results = []
tool_uses = []
async for message in query(
prompt="¿Cuántas funciones tiene src/main.py?",
options=ClaudeCodeOptions(
cwd=str(temp_project),
allowed_tools=["Read"], # Solo lectura
)
):
from claude_code_sdk import AssistantMessage
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text"):
results.append(block.text)
elif hasattr(block, "name"):
tool_uses.append(block.name)
full_output = " ".join(results).lower()
assert "3" in full_output or "tres" in full_output
assert "Read" in tool_uses
@pytest.mark.integration
@pytest.mark.asyncio
async def test_agente_escribe_archivo_nuevo(temp_project: Path):
"""El agente puede crear archivos nuevos en el proyecto."""
async for message in query(
prompt="Crea un archivo src/constants.py con la constante PI = 3.14159",
options=ClaudeCodeOptions(
cwd=str(temp_project),
allowed_tools=["Read", "Write"],
)
):
pass # Solo esperamos que termine
new_file = temp_project / "src" / "constants.py"
assert new_file.exists()
content = new_file.read_text()
assert "PI" in content
assert "3.14" in content
@pytest.mark.integration
@pytest.mark.asyncio
async def test_agente_no_escribe_fuera_del_proyecto(temp_project: Path):
"""El agente no puede escribir fuera del directorio del proyecto."""
written_outside = []
async for message in query(
prompt="Escribe 'test' en /tmp/outside.txt",
options=ClaudeCodeOptions(
cwd=str(temp_project),
allowed_tools=["Write"],
)
):
pass
# El archivo no debe existir o el agente debe haber rechazado la tarea
# (el SDK restringe automáticamente al cwd)
assert not Path("/tmp/outside.txt").exists()
TypeScript: beforeEach/afterEach con tmpdir
// tests/integration/agent.integration.test.ts
import { describe, it, expect, beforeEach, afterEach } from "vitest";
import { mkdtempSync, writeFileSync, mkdirSync, existsSync, readFileSync } from "fs";
import { rm } from "fs/promises";
import { tmpdir } from "os";
import { join } from "path";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
describe("Agent Integration Tests", () => {
let tempDir: string;
beforeEach(() => {
tempDir = mkdtempSync(join(tmpdir(), "agent_test_"));
// Crear proyecto de prueba
mkdirSync(join(tempDir, "src"));
writeFileSync(
join(tempDir, "src", "index.ts"),
`export function greet(name: string): string {
return \`Hello, \${name}!\`;
}
export function add(a: number, b: number): number {
return a + b;
}
`
);
writeFileSync(join(tempDir, "package.json"), JSON.stringify({ name: "test-project" }));
});
afterEach(async () => {
await rm(tempDir, { recursive: true, force: true });
});
it("debe poder leer archivos del proyecto", async () => {
const texts: string[] = [];
for await (const message of query({
prompt: "¿Qué funciones hay en src/index.ts?",
options: {
cwd: tempDir,
allowed_tools: ["Read"],
} as ClaudeCodeOptions,
})) {
if (message.type === "assistant") {
for (const block of message.message.content) {
if (block.type === "text") texts.push(block.text);
}
}
}
const output = texts.join(" ").toLowerCase();
expect(output).toMatch(/greet|add/i);
});
it("debe crear archivos en el directorio correcto", async () => {
for await (const _message of query({
prompt: 'Crea src/config.ts con export const VERSION = "1.0.0"',
options: {
cwd: tempDir,
allowed_tools: ["Read", "Write"],
} as ClaudeCodeOptions,
})) {
// Solo esperar que termine
}
const configPath = join(tempDir, "src", "config.ts");
expect(existsSync(configPath)).toBe(true);
const content = readFileSync(configPath, "utf-8");
expect(content).toContain("VERSION");
});
});
6. Tests End-to-End
Agente completo contra la API real
Los tests e2e usan la API real de Anthropic. Son lentos y costosos, así que deben ser selectivos:
# tests/e2e/test_full_agent.py
import pytest
import os
from pathlib import Path
# Marcar todos los tests en este archivo como e2e
pytestmark = [
pytest.mark.e2e,
pytest.mark.skipif(
not os.getenv("ANTHROPIC_API_KEY"),
reason="Requiere ANTHROPIC_API_KEY"
)
]
@pytest.mark.asyncio
async def test_agente_refactoriza_codigo(python_package: Path):
"""Test e2e: el agente refactoriza código real."""
from claude_code_sdk import query, ClaudeCodeOptions, ResultMessage
result_message = None
async for message in query(
prompt="""
Refactoriza src/utils.py para agregar type hints
y una docstring a la función format_name.
""",
options=ClaudeCodeOptions(
cwd=str(python_package),
allowed_tools=["Read", "Edit"],
model="claude-haiku-4-5", # Modelo más barato para e2e
)
):
if hasattr(message, "subtype"):
result_message = message
# Verificar resultado, no el proceso
utils_content = (python_package / "src" / "utils.py").read_text()
assert "str" in utils_content or "->" in utils_content # Type hints añadidos
assert '"""' in utils_content or "'''" in utils_content # Docstring añadida
assert result_message is not None
assert result_message.subtype == "success"
@pytest.mark.asyncio
async def test_agente_no_supera_budget_tokens(temp_project: Path):
"""Test e2e: verificar que una tarea simple no gasta demasiados tokens."""
from claude_code_sdk import query, ClaudeCodeOptions, ResultMessage
result_message = None
async for message in query(
prompt="¿Cuántos archivos hay en src/?",
options=ClaudeCodeOptions(
cwd=str(temp_project),
allowed_tools=["Read"],
model="claude-haiku-4-5",
)
):
if hasattr(message, "cost_usd"):
result_message = message
# Una tarea simple no debe costar más de $0.01
if result_message and hasattr(result_message, "cost_usd"):
assert result_message.cost_usd < 0.01, (
f"Tarea simple costó demasiado: ${result_message.cost_usd:.4f}"
)
Configuración pytest para e2e
# pytest.ini o pyproject.toml
[tool.pytest.ini_options]
markers = [
"unit: tests unitarios rápidos sin API",
"integration: tests con SDK pero sin API real (filesystem aislado)",
"e2e: tests que llaman a la API real de Anthropic",
]
# Por defecto, excluir e2e
addopts = "-m 'not e2e'"
# Correr solo unit tests (por defecto en CI)
pytest -m unit
# Correr tests de integración localmente
pytest -m "unit or integration"
# Correr e2e (solo en nightly o con flag explícito)
pytest -m e2e
7. Evaluaciones (Evals)
¿Qué es una eval para agentes?
Una eval es diferente a un test tradicional. No verifica que el código “funciona”, sino que el agente “hace bien su trabajo”. La diferencia es sutil pero crucial:
- Test:
assert result.status == "success"— verifica comportamiento técnico - Eval:
assert_code_quality(result.output) > 0.8— verifica calidad del resultado
graph LR
A[Input] --> B[Agente]
B --> C[Output]
C --> D{Evaluador}
D -->|LLM Judge| E[Score 0-1]
D -->|Heurístico| F[Pass/Fail]
D -->|Humano| G[Feedback]
E --> H[Reporte]
F --> H
G --> H
Suite de evals para agente de code review
# evals/eval_code_review_agent.py
import pytest
import json
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
from claude_code_sdk import query, ClaudeCodeOptions
@dataclass
class EvalCase:
name: str
code: str
expected_issues: list[str] # Palabras clave que deben aparecer
unexpected_content: list[str] = None # Contenido que NO debe aparecer
EVAL_CASES = [
EvalCase(
name="detecta_sql_injection",
code="""
def get_user(user_id):
query = f"SELECT * FROM users WHERE id = {user_id}"
return db.execute(query)
""",
expected_issues=["sql injection", "injection", "parameterized", "f-string"],
unexpected_content=["looks good", "no issues"]
),
EvalCase(
name="detecta_hardcoded_password",
code="""
DATABASE_PASSWORD = "super_secret_123"
db = connect(password=DATABASE_PASSWORD)
""",
expected_issues=["hardcoded", "password", "secret", "environment variable"],
unexpected_content=[]
),
EvalCase(
name="detecta_codigo_limpio",
code="""
def calculate_area(width: float, height: float) -> float:
'''Calcula el área de un rectángulo.'''
return width * height
""",
expected_issues=[], # Código correcto, no debe reportar problemas graves
unexpected_content=["critical", "vulnerability", "injection"]
),
]
@dataclass
class EvalResult:
case: EvalCase
output: str
score: float
passed: bool
details: str
async def run_code_review_agent(code: str, cwd: str) -> str:
"""Corre el agente de code review sobre el código dado."""
texts = []
# Escribir el código en un archivo temporal
code_file = Path(cwd) / "review_target.py"
code_file.write_text(code)
async for message in query(
prompt=f"Revisa el archivo review_target.py en busca de problemas de seguridad y calidad.",
options=ClaudeCodeOptions(
cwd=cwd,
allowed_tools=["Read"],
model="claude-haiku-4-5"
)
):
from claude_code_sdk import AssistantMessage
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text"):
texts.append(block.text)
return "\n".join(texts)
def evaluate_review(case: EvalCase, output: str) -> EvalResult:
"""Evalúa el output del agente contra los criterios."""
output_lower = output.lower()
score = 0.0
issues = []
# Verificar que menciona los issues esperados
if case.expected_issues:
found = sum(1 for issue in case.expected_issues if issue in output_lower)
issue_score = found / len(case.expected_issues)
score += issue_score * 0.7
if issue_score < 1.0:
missing = [i for i in case.expected_issues if i not in output_lower]
issues.append(f"No mencionó: {missing}")
else:
# Si no hay issues esperados, verificar que no reporta falsos positivos
score += 0.7
# Verificar contenido no esperado
if case.unexpected_content:
found_unexpected = sum(1 for u in case.unexpected_content if u in output_lower)
if found_unexpected == 0:
score += 0.3
else:
issues.append(f"Contenido inesperado encontrado")
passed = score >= 0.7
details = f"Score: {score:.2f}. " + "; ".join(issues) if issues else f"Score: {score:.2f}. Todos los criterios cumplidos."
return EvalResult(case=case, output=output, score=score, passed=passed, details=details)
@pytest.mark.e2e
@pytest.mark.parametrize("case", EVAL_CASES, ids=lambda c: c.name)
@pytest.mark.asyncio
async def test_code_review_eval(case: EvalCase, tmp_path: Path):
"""Eval parametrizada para el agente de code review."""
output = await run_code_review_agent(case.code, str(tmp_path))
result = evaluate_review(case, output)
if not result.passed:
pytest.fail(f"Eval falló para '{case.name}': {result.details}\n\nOutput del agente:\n{output}")
@pytest.mark.e2e
@pytest.mark.asyncio
async def test_eval_suite_completo(tmp_path: Path):
"""Corre todos los evals y genera reporte de resultados."""
results = []
for case in EVAL_CASES:
output = await run_code_review_agent(case.code, str(tmp_path))
result = evaluate_review(case, output)
results.append(result)
# Generar reporte
total = len(results)
passed = sum(1 for r in results if r.passed)
avg_score = sum(r.score for r in results) / total
print(f"\n=== Eval Report ===")
print(f"Passed: {passed}/{total} ({passed/total*100:.1f}%)")
print(f"Average score: {avg_score:.2f}")
for r in results:
status = "✓" if r.passed else "✗"
print(f" {status} {r.case.name}: {r.details}")
# El suite completo debe tener al menos 70% de pass rate
assert passed / total >= 0.7, f"Eval suite: solo {passed}/{total} pasaron"
LLM-as-judge
# evals/llm_judge.py
from claude_code_sdk import query, ClaudeCodeOptions
import json
async def llm_judge(
task: str,
agent_output: str,
criteria: list[str]
) -> dict:
"""Usa Claude para juzgar la calidad del output de otro agente."""
judge_prompt = f"""Evalúa el siguiente output de un agente de IA.
TAREA QUE SE LE DIO AL AGENTE:
{task}
OUTPUT DEL AGENTE:
{agent_output}
CRITERIOS DE EVALUACIÓN:
{chr(10).join(f"- {c}" for c in criteria)}
Responde SOLO con un JSON válido con esta estructura:
{{
"score": <número entre 0 y 1>,
"criteria_scores": {{<criterio>: <score 0-1>}},
"reasoning": "<explicación breve>",
"passed": <true si score >= 0.7>
}}"""
texts = []
async for message in query(
prompt=judge_prompt,
options=ClaudeCodeOptions(
cwd="/tmp",
allowed_tools=[], # Judge no necesita herramientas
model="claude-haiku-4-5"
)
):
from claude_code_sdk import AssistantMessage
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text"):
texts.append(block.text)
output = "\n".join(texts)
# Extraer JSON del output
try:
start = output.find("{")
end = output.rfind("}") + 1
return json.loads(output[start:end])
except (json.JSONDecodeError, ValueError):
return {"score": 0, "passed": False, "reasoning": "No se pudo parsear el resultado del judge"}
8. Property-based Testing
Hypothesis para Python
# tests/test_properties.py
import pytest
from hypothesis import given, strategies as st, settings
from hypothesis.stateful import RuleBasedStateMachine, rule, invariant
from pathlib import Path
import tempfile
import shutil
# Propiedad 1: El agente nunca supera el budget
@given(
prompt=st.text(min_size=1, max_size=100),
max_cost=st.floats(min_value=0.001, max_value=1.0)
)
@settings(max_examples=5) # Pocos ejemplos para no gastar en API
@pytest.mark.asyncio
async def test_agente_respeta_budget(prompt, max_cost):
"""El agente nunca debe superar el budget dado."""
# Este test usa mocks, no la API real
from unittest.mock import patch
from tests.fixtures.sdk_mocks import make_query_response, make_result_message
from my_agent import run_agent
mock_result = make_result_message(cost_usd=0.0001)
mock_response = make_query_response(mock_result)
with patch("my_agent.query", mock_response):
result = await run_agent(prompt, cwd="/tmp", max_cost_usd=max_cost)
# La propiedad: si el costo excede el budget, debe lanzar error o truncar
if result.cost_usd > max_cost:
assert result.truncated is True
# Propiedad 2: El hook de seguridad siempre bloquea paths peligrosos
@given(
dangerous_path=st.one_of(
st.just("/etc/passwd"),
st.just("/etc/shadow"),
st.just("/root/.ssh/id_rsa"),
st.text(min_size=1).map(lambda p: f"/etc/{p}"),
st.text(min_size=1).map(lambda p: f"/root/{p}"),
)
)
def test_hook_siempre_bloquea_paths_del_sistema(dangerous_path):
"""El hook de seguridad nunca permite escribir en rutas del sistema."""
from src.hooks.security_hook import create_security_hook
hook = create_security_hook(allowed_dirs=["/tmp", "/proyecto"])
result = hook("Write", {"file_path": dangerous_path})
assert result is not None, f"Debería haber bloqueado {dangerous_path}"
assert result["deny"] is True
# Propiedad 3: El text buffer siempre produce el mismo texto al hacer flush
@given(
chunks=st.lists(st.text(), min_size=0, max_size=20)
)
def test_text_buffer_preserva_contenido(chunks):
"""El buffer debe preservar exactamente el contenido original."""
import sys
sys.path.insert(0, "src")
from streaming import TextBuffer # Clase del cap 11
buf = TextBuffer(flush_on="\n")
all_text = "".join(chunks)
for chunk in chunks:
buf.add(chunk)
remaining = buf.flush()
reconstructed = "\n".join(buf.lines) + ("\n" if buf.lines else "") + remaining
# El contenido reconstruido debe igualar el original (sin newlines intermedias)
assert reconstructed.replace("\n", "") == all_text.replace("\n", "")
fast-check para TypeScript
// tests/properties/security.property.test.ts
import { describe, it, expect } from "vitest";
import * as fc from "fast-check";
import { createSecurityHook } from "../../src/hooks/securityHook";
describe("Security Hook Properties", () => {
it("siempre bloquea rutas del sistema operativo", () => {
const hook = createSecurityHook({ allowedDirs: ["/tmp", "/proyecto"] });
fc.assert(
fc.property(
fc.constantFrom("/etc/passwd", "/etc/shadow", "/root/.ssh/id_rsa"),
(dangerousPath) => {
const result = hook("Write", { file_path: dangerousPath });
return result !== null && result.deny === true;
}
)
);
});
it("siempre permite escritura en directorios permitidos", () => {
const allowed = ["/tmp", "/proyecto"];
const hook = createSecurityHook({ allowedDirs: allowed });
fc.assert(
fc.property(
fc.constantFrom(...allowed),
fc.string({ minLength: 1, maxLength: 50 }).filter((s) => !s.includes("/")),
(dir, filename) => {
const result = hook("Write", { file_path: `${dir}/${filename}` });
return result === null; // null = permitir
}
)
);
});
it("Read nunca es bloqueado independientemente del path", () => {
const hook = createSecurityHook({ allowedDirs: ["/tmp"] });
fc.assert(
fc.property(fc.string({ minLength: 1 }), (anyPath) => {
const result = hook("Read", { file_path: anyPath });
return result === null;
})
);
});
});
9. Snapshot Testing
Guardar outputs y comparar en el futuro
# tests/snapshots/test_snapshots.py
import json
from pathlib import Path
import pytest
SNAPSHOT_DIR = Path("tests/snapshots/__snapshots__")
def save_snapshot(name: str, data: dict) -> None:
SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True)
path = SNAPSHOT_DIR / f"{name}.json"
path.write_text(json.dumps(data, indent=2))
def load_snapshot(name: str) -> dict | None:
path = SNAPSHOT_DIR / f"{name}.json"
if path.exists():
return json.loads(path.read_text())
return None
def assert_matches_snapshot(name: str, data: dict, update: bool = False) -> None:
"""Compara data con snapshot guardado."""
snapshot = load_snapshot(name)
if snapshot is None or update:
save_snapshot(name, data)
if update:
pytest.skip(f"Snapshot '{name}' actualizado")
return # Primera vez: guardar
# Comparar
assert data == snapshot, (
f"Snapshot '{name}' no coincide.\n"
f"Esperado: {json.dumps(snapshot, indent=2)}\n"
f"Actual: {json.dumps(data, indent=2)}\n"
f"Para actualizar: pytest --update-snapshots"
)
class TestAgentOutputSnapshots:
@pytest.mark.asyncio
async def test_analisis_proyecto_snapshot(self):
"""El análisis de proyecto debe ser consistente."""
from unittest.mock import patch
from tests.fixtures.sdk_mocks import (
make_assistant_message, make_text_block, make_result_message, make_query_response
)
from my_agent import run_analysis
mock_messages = [
make_assistant_message(
make_text_block("El proyecto tiene 3 módulos principales: auth, api, y database.")
),
make_result_message()
]
with patch("my_agent.query", make_query_response(*mock_messages)):
result = await run_analysis("/fake/project")
# Convertir a dict serializable
result_dict = {
"output": result.output,
"tools_used": sorted(result.tools_used),
"success": result.success,
}
assert_matches_snapshot("analisis_proyecto", result_dict)
10. CI/CD para Tests de Agentes
GitHub Actions
# .github/workflows/tests.yml
name: Tests
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
schedule:
- cron: "0 2 * * *" # Nightly a las 2am UTC
jobs:
unit-tests:
name: Unit Tests (sin API)
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- run: pip install -r requirements-dev.txt
- run: pytest -m "unit" -v --tb=short
# No necesita ANTHROPIC_API_KEY
integration-tests:
name: Integration Tests (filesystem aislado)
runs-on: ubuntu-latest
needs: unit-tests
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- run: pip install -r requirements-dev.txt
- run: npm install -g @anthropic-ai/claude-code
- run: pytest -m "integration" -v --tb=short
# Usa el SDK real pero con filesystem aislado, no llama API de Anthropic
e2e-tests:
name: E2E Tests (API real)
runs-on: ubuntu-latest
# Solo en nightly o cuando hay cambios en el agente
if: github.event_name == 'schedule' || contains(github.event.head_commit.message, '[e2e]')
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- run: pip install -r requirements-dev.txt
- run: npm install -g @anthropic-ai/claude-code
- run: pytest -m "e2e" -v --tb=short
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
typescript-tests:
name: TypeScript Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: "20"
- run: npm ci
- run: npx vitest run --reporter=verbose
Reporte de costos en CI
# scripts/report_test_costs.py
import json
from pathlib import Path
def report_costs(results_path: str):
"""Lee los resultados de tests e2e y reporta costos."""
results = json.loads(Path(results_path).read_text())
total_cost = sum(r.get("cost_usd", 0) for r in results)
total_tests = len(results)
avg_cost = total_cost / total_tests if total_tests > 0 else 0
print(f"\n💰 Reporte de Costos de Tests E2E")
print(f"{'='*40}")
print(f"Total tests: {total_tests}")
print(f"Costo total: ${total_cost:.4f}")
print(f"Costo promedio: ${avg_cost:.6f}")
print(f"{'='*40}")
if total_cost > 5.0:
print("⚠️ ADVERTENCIA: Costo total supera $5.00")
return 1
return 0
if __name__ == "__main__":
import sys
sys.exit(report_costs(sys.argv[1]))
11. Test Fixtures y Factories
Factory para mensajes del SDK
# tests/factories.py
from typing import Optional, Any
from unittest.mock import MagicMock
from dataclasses import dataclass
class MessageFactory:
"""Factory para crear mensajes del SDK en tests."""
@staticmethod
def text(content: str) -> MagicMock:
block = MagicMock()
block.text = content
block.type = "text"
type(block).name = property(lambda self: (_ for _ in ()).throw(AttributeError()))
return block
@staticmethod
def tool_use(
name: str,
input_data: Optional[dict] = None,
tool_id: Optional[str] = None
) -> MagicMock:
block = MagicMock()
block.name = name
block.type = "tool_use"
block.input = input_data or {}
block.id = tool_id or f"tool_{name}"
return block
@staticmethod
def assistant(*blocks) -> MagicMock:
from claude_code_sdk import AssistantMessage
msg = MagicMock(spec=AssistantMessage)
msg.content = list(blocks)
return msg
@staticmethod
def result(
subtype: str = "success",
duration_ms: int = 1500,
cost_usd: float = 0.002,
num_turns: int = 3
) -> MagicMock:
from claude_code_sdk import ResultMessage
msg = MagicMock(spec=ResultMessage)
msg.subtype = subtype
msg.duration_ms = duration_ms
msg.cost_usd = cost_usd
msg.num_turns = num_turns
return msg
@classmethod
def conversation(
cls,
*,
text_before_tool: str = "Analizando...",
tool_name: str = "Read",
tool_input: dict = None,
text_after_tool: str = "Listo.",
result_kwargs: dict = None
) -> list:
"""Crea una conversación típica completa."""
return [
cls.assistant(
cls.text(text_before_tool),
cls.tool_use(tool_name, tool_input or {}),
cls.text(text_after_tool),
),
cls.result(**(result_kwargs or {}))
]
class OptionsFactory:
"""Factory para ClaudeCodeOptions."""
@staticmethod
def readonly(cwd: str = "/tmp") -> dict:
return {
"cwd": cwd,
"allowed_tools": ["Read"],
}
@staticmethod
def readwrite(cwd: str = "/tmp") -> dict:
return {
"cwd": cwd,
"allowed_tools": ["Read", "Write", "Edit"],
}
@staticmethod
def full(cwd: str = "/tmp") -> dict:
return {
"cwd": cwd,
"allowed_tools": ["Read", "Write", "Edit", "Bash", "MultiEdit"],
}
Shared test utilities library
# tests/utils.py
from typing import AsyncGenerator, Any
from unittest.mock import patch, AsyncMock
class AgentTestHelper:
"""Utilidades de alto nivel para testear agentes."""
def __init__(self, agent_module: str, query_func: str = "query"):
self.agent_module = agent_module
self.query_func = query_func
def mock_response(self, messages: list):
"""Context manager para mockear el SDK."""
from tests.factories import MessageFactory
async def _generator(*args, **kwargs):
for msg in messages:
yield msg
return patch(
f"{self.agent_module}.{self.query_func}",
side_effect=_generator
)
async def run_with_mock(self, agent_func, *args, messages=None, **kwargs):
"""Ejecuta una función del agente con respuesta mockeada."""
if messages is None:
from tests.factories import MessageFactory
messages = MessageFactory.conversation()
with self.mock_response(messages):
return await agent_func(*args, **kwargs)
class AssertionHelpers:
"""Helpers para assertions sobre agentes."""
@staticmethod
def assert_contains_all(text: str, keywords: list[str]) -> None:
missing = [kw for kw in keywords if kw not in text.lower()]
assert not missing, f"Texto no contiene: {missing}\n\nTexto: {text[:200]}"
@staticmethod
def assert_tools_used(actual_tools: list[str], expected: list[str]) -> None:
missing = set(expected) - set(actual_tools)
assert not missing, f"Herramientas no usadas: {missing}. Usadas: {actual_tools}"
@staticmethod
def assert_cost_under(cost_usd: float, max_usd: float) -> None:
assert cost_usd <= max_usd, f"Costo ${cost_usd:.4f} supera límite ${max_usd:.4f}"
Resumen del Capítulo
mindmap
root((Testing de Agentes))
Desafíos
No-determinismo
Costo de API
Efectos secundarios
Pirámide
Unit 60%
Mocks del SDK
Hooks aislados
Herramientas MCP
Integration 30%
SDK real
Filesystem aislado
tmp dirs
E2E 10%
API real
Modelo barato
Nightly CI
Evaluaciones
Casos de prueba
LLM-as-judge
Regression testing
Avanzado
Property-based
Snapshot testing
CI/CD pipeline
Infraestructura
Factories
Helpers
Fixtures
El testing de agentes es un equilibrio entre cobertura, costo y velocidad. La estrategia correcta es maximizar los tests gratuitos (unitarios con mocks) y ser muy selectivo con los e2e. En el próximo capítulo aprenderemos a hacer estos agentes verdaderamente resilientes con manejo robusto de errores.
12. Evaluaciones con Dataset Estructurado
Datasets JSONL para Evals Reproducibles
Un dataset estructurado permite correr las mismas evaluaciones repetidamente y detectar regresiones cuando el agente cambia. El formato JSONL (JSON Lines) es ideal: un caso de prueba por línea, fácil de versionar en git y de extender.
{"id": "sql_injection_basic", "input": "def get_user(id):\n return db.execute(f'SELECT * FROM users WHERE id = {id}')", "expected_issues": ["sql injection", "parameterized"], "severity": "critical"}
{"id": "hardcoded_secret", "input": "API_KEY = 'sk-abc123xyz'\nrequests.get(url, headers={'Authorization': API_KEY})", "expected_issues": ["hardcoded", "environment variable"], "severity": "high"}
{"id": "clean_code", "input": "def add(a: int, b: int) -> int:\n '''Suma dos enteros.'''\n return a + b", "expected_issues": [], "severity": "none"}
{"id": "missing_error_handling", "input": "def read_file(path):\n with open(path) as f:\n return f.read()", "expected_issues": ["error handling", "exception", "try"], "severity": "medium"}
{"id": "n_plus_one_query", "input": "users = db.query(User).all()\nfor user in users:\n orders = db.query(Order).filter(Order.user_id == user.id).all()", "expected_issues": ["n+1", "eager loading", "join"], "severity": "high"}
Suite Completa de Evals en Python
# evals/dataset_eval.py
import json
import asyncio
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage
@dataclass
class EvalDatasetCase:
id: str
input: str
expected_issues: list[str]
severity: str
unexpected_phrases: list[str] = field(default_factory=list)
@dataclass
class EvalMetrics:
total: int
passed: int
failed: int
accuracy: float
precision: float
recall: float
f1_score: float
avg_score: float
failed_cases: list[str] = field(default_factory=list)
def __str__(self) -> str:
lines = [
f"Total casos: {self.total}",
f"Pasaron: {self.passed} ({self.accuracy:.1%})",
f"Fallaron: {self.failed}",
f"Precisión: {self.precision:.3f}",
f"Recall: {self.recall:.3f}",
f"F1-Score: {self.f1_score:.3f}",
f"Score avg: {self.avg_score:.3f}",
]
if self.failed_cases:
lines.append(f"Fallidos: {', '.join(self.failed_cases)}")
return "\n".join(lines)
def load_dataset(path: str) -> list[EvalDatasetCase]:
"""Carga un dataset JSONL de casos de eval."""
cases = []
with open(path) as f:
for line in f:
line = line.strip()
if not line:
continue
data = json.loads(line)
cases.append(EvalDatasetCase(**data))
return cases
async def run_agent_on_case(case: EvalDatasetCase, cwd: str) -> str:
"""Ejecuta el agente de code review sobre un caso del dataset."""
code_file = Path(cwd) / "review_target.py"
code_file.write_text(case.input)
texts: list[str] = []
async for message in query(
prompt="Analiza review_target.py. Identifica problemas de seguridad, calidad y rendimiento.",
options=ClaudeCodeOptions(
cwd=cwd,
allowed_tools=["Read"],
model="claude-haiku-4-5",
max_turns=10,
),
):
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text"):
texts.append(block.text)
return "\n".join(texts)
def score_case(case: EvalDatasetCase, output: str) -> tuple[float, bool, list[str]]:
"""
Calcula score para un caso.
Returns:
(score 0-1, passed, lista_de_problemas)
"""
output_lower = output.lower()
problems = []
true_positives = 0
false_negatives = 0
false_positives = 0
# Calcular True Positives y False Negatives
for issue in case.expected_issues:
if issue.lower() in output_lower:
true_positives += 1
else:
false_negatives += 1
problems.append(f"No detectó: '{issue}'")
# Verificar False Positives (contenido que no debería aparecer)
for phrase in case.unexpected_phrases:
if phrase.lower() in output_lower:
false_positives += 1
problems.append(f"Falso positivo: '{phrase}'")
# Si no hay expected_issues (código limpio), verificar que no reporte críticos
if not case.expected_issues:
critical_words = ["critical vulnerability", "sql injection", "security breach"]
for word in critical_words:
if word in output_lower:
false_positives += 1
problems.append(f"Falso positivo en código limpio: '{word}'")
total_expected = max(len(case.expected_issues), 1)
precision = true_positives / max(true_positives + false_positives, 1)
recall = true_positives / max(true_positives + false_negatives, 1)
f1 = 2 * precision * recall / max(precision + recall, 0.001)
# Score combinado
score = (recall * 0.6 + precision * 0.4) if case.expected_issues else (1.0 - false_positives * 0.3)
score = max(0.0, min(1.0, score))
passed = score >= 0.7 and false_positives == 0
return score, passed, problems
async def run_eval_suite(dataset_path: str, verbose: bool = True) -> EvalMetrics:
"""
Corre la suite completa de evals y calcula métricas agregadas.
Args:
dataset_path: Ruta al archivo JSONL con casos de prueba
verbose: Mostrar detalles de cada caso
Returns:
EvalMetrics con accuracy, precision, recall, f1
"""
cases = load_dataset(dataset_path)
scores: list[float] = []
passed_count = 0
all_precision: list[float] = []
all_recall: list[float] = []
failed_cases: list[str] = []
with tempfile.TemporaryDirectory(prefix="eval_") as tmpdir:
for case in cases:
output = await run_agent_on_case(case, tmpdir)
score, passed, problems = score_case(case, output)
scores.append(score)
if passed:
passed_count += 1
else:
failed_cases.append(case.id)
# Calcular precision/recall para métricas
output_lower = output.lower()
tp = sum(1 for i in case.expected_issues if i.lower() in output_lower)
fp = sum(1 for p in case.unexpected_phrases if p.lower() in output_lower)
fn = len(case.expected_issues) - tp
p = tp / max(tp + fp, 1)
r = tp / max(tp + fn, 1)
all_precision.append(p)
all_recall.append(r)
if verbose:
status = "PASS" if passed else "FAIL"
print(f" [{status}] {case.id}: score={score:.2f}")
for problem in problems:
print(f" -> {problem}")
total = len(cases)
avg_precision = sum(all_precision) / max(len(all_precision), 1)
avg_recall = sum(all_recall) / max(len(all_recall), 1)
f1 = 2 * avg_precision * avg_recall / max(avg_precision + avg_recall, 0.001)
return EvalMetrics(
total=total,
passed=passed_count,
failed=total - passed_count,
accuracy=passed_count / max(total, 1),
precision=avg_precision,
recall=avg_recall,
f1_score=f1,
avg_score=sum(scores) / max(len(scores), 1),
failed_cases=failed_cases,
)
# ============================================================
# Comparación de versiones del agente (regression testing)
# ============================================================
@dataclass
class VersionComparison:
baseline_metrics: EvalMetrics
candidate_metrics: EvalMetrics
def regression_detected(self, threshold: float = 0.05) -> bool:
"""True si el candidato es significativamente peor que el baseline."""
accuracy_drop = self.baseline_metrics.accuracy - self.candidate_metrics.accuracy
f1_drop = self.baseline_metrics.f1_score - self.candidate_metrics.f1_score
return accuracy_drop > threshold or f1_drop > threshold
def improvement_detected(self, threshold: float = 0.05) -> bool:
accuracy_gain = self.candidate_metrics.accuracy - self.baseline_metrics.accuracy
return accuracy_gain > threshold
def report(self) -> str:
b = self.baseline_metrics
c = self.candidate_metrics
lines = [
"=== Comparación de Versiones ===",
f"{'Métrica':<15} {'Baseline':>10} {'Candidato':>10} {'Delta':>10}",
f"{'Accuracy':<15} {b.accuracy:>10.3f} {c.accuracy:>10.3f} {c.accuracy - b.accuracy:>+10.3f}",
f"{'Precision':<15} {b.precision:>10.3f} {c.precision:>10.3f} {c.precision - b.precision:>+10.3f}",
f"{'Recall':<15} {b.recall:>10.3f} {c.recall:>10.3f} {c.recall - b.recall:>+10.3f}",
f"{'F1-Score':<15} {b.f1_score:>10.3f} {c.f1_score:>10.3f} {c.f1_score - b.f1_score:>+10.3f}",
]
if self.regression_detected():
lines.append("\n REGRESION DETECTADA: el candidato es peor que el baseline")
elif self.improvement_detected():
lines.append("\n MEJORA DETECTADA: el candidato supera al baseline")
else:
lines.append("\n Sin cambios significativos")
return "\n".join(lines)
Suite de Evals en TypeScript con Vitest
// evals/dataset-eval.test.ts
import { describe, it, expect, beforeAll } from "vitest";
import { readFileSync, writeFileSync, mkdtempSync } from "fs";
import { rm } from "fs/promises";
import { tmpdir } from "os";
import { join } from "path";
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
interface EvalCase {
id: string;
input: string;
expectedIssues: string[];
severity: string;
unexpectedPhrases?: string[];
}
interface CaseResult {
id: string;
score: number;
passed: boolean;
problems: string[];
}
function loadDataset(path: string): EvalCase[] {
return readFileSync(path, "utf-8")
.split("\n")
.filter((line) => line.trim())
.map((line) => JSON.parse(line));
}
async function runAgentOnCase(caseData: EvalCase, cwd: string): Promise<string> {
writeFileSync(join(cwd, "review_target.py"), caseData.input);
const texts: string[] = [];
for await (const message of query(
"Analiza review_target.py en busca de problemas.",
{ cwd, allowedTools: ["Read"], maxTurns: 10 } as ClaudeCodeOptions
)) {
if (message.type === "assistant") {
for (const block of message.message.content) {
if (block.type === "text") texts.push(block.text);
}
}
}
return texts.join("\n");
}
function scoreCase(caseData: EvalCase, output: string): CaseResult {
const lower = output.toLowerCase();
const problems: string[] = [];
let truePositives = 0;
for (const issue of caseData.expectedIssues) {
if (lower.includes(issue.toLowerCase())) {
truePositives++;
} else {
problems.push(`No detectó: '${issue}'`);
}
}
const recall = truePositives / Math.max(caseData.expectedIssues.length, 1);
const score = caseData.expectedIssues.length === 0 ? 1.0 : recall;
const passed = score >= 0.7;
return { id: caseData.id, score, passed, problems };
}
describe("Eval Suite: Code Review Agent", () => {
let tempDir: string;
beforeAll(() => {
tempDir = mkdtempSync(join(tmpdir(), "eval_"));
});
// Inline dataset para este test
const evalCases: EvalCase[] = [
{
id: "sql_injection",
input: `def get_user(id):\n return db.execute(f'SELECT * FROM users WHERE id = {id}')`,
expectedIssues: ["sql injection", "parameterized"],
severity: "critical",
},
{
id: "clean_function",
input: `def add(a: int, b: int) -> int:\n '''Suma dos enteros.'''\n return a + b`,
expectedIssues: [],
severity: "none",
unexpectedPhrases: ["critical vulnerability"],
},
];
for (const evalCase of evalCases) {
it(`eval: ${evalCase.id}`, async () => {
const output = await runAgentOnCase(evalCase, tempDir);
const result = scoreCase(evalCase, output);
if (!result.passed) {
console.log(`Problemas en ${result.id}:`, result.problems);
}
expect(result.score).toBeGreaterThanOrEqual(0.7);
}, 60_000);
}
it("accuracy total >= 80%", async () => {
const results: CaseResult[] = [];
for (const c of evalCases) {
const output = await runAgentOnCase(c, tempDir);
results.push(scoreCase(c, output));
}
const accuracy = results.filter((r) => r.passed).length / results.length;
expect(accuracy).toBeGreaterThanOrEqual(0.8);
}, 120_000);
});
13. Testing con Filesystem Real Aislado
Factories de Proyectos de Prueba
Los tests de integración se vuelven más poderosos cuando se usan proyectos de prueba realistas en lugar de archivos sueltos. Las factories permiten crear proyectos complejos de forma reproducible:
# tests/project_factories.py
import shutil
from pathlib import Path
import tempfile
from typing import Generator
import pytest
def proyecto_python(base_dir: Path) -> Path:
"""
Crea un proyecto Python mínimo pero realista.
Incluye src/, tests/, pyproject.toml.
"""
proyecto = base_dir / "mi_proyecto"
proyecto.mkdir()
(proyecto / "src").mkdir()
(proyecto / "src" / "__init__.py").write_text("")
(proyecto / "src" / "core.py").write_text(
"""from typing import Optional
def procesar_datos(datos: list[dict]) -> list[dict]:
\"\"\"Procesa una lista de registros.\"\"\"
return [
{k: v.strip() if isinstance(v, str) else v for k, v in item.items()}
for item in datos
if item
]
def buscar_por_id(datos: list[dict], id_: int) -> Optional[dict]:
\"\"\"Busca un registro por ID. Retorna None si no existe.\"\"\"
return next((d for d in datos if d.get("id") == id_), None)
"""
)
(proyecto / "tests").mkdir()
(proyecto / "tests" / "__init__.py").write_text("")
(proyecto / "tests" / "test_core.py").write_text(
"""from src.core import procesar_datos, buscar_por_id
def test_procesar_datos_limpia_strings():
datos = [{"nombre": " Alice ", "edad": 30}]
resultado = procesar_datos(datos)
assert resultado[0]["nombre"] == "Alice"
def test_buscar_por_id_existente():
datos = [{"id": 1, "nombre": "Alice"}, {"id": 2, "nombre": "Bob"}]
resultado = buscar_por_id(datos, 1)
assert resultado["nombre"] == "Alice"
def test_buscar_por_id_inexistente():
datos = [{"id": 1}]
resultado = buscar_por_id(datos, 999)
assert resultado is None
"""
)
(proyecto / "pyproject.toml").write_text(
"""[project]
name = "mi-proyecto"
version = "0.1.0"
requires-python = ">=3.11"
[tool.pytest.ini_options]
testpaths = ["tests"]
"""
)
return proyecto
def proyecto_typescript(base_dir: Path) -> Path:
"""
Crea un proyecto TypeScript con src/, tests/, package.json.
"""
proyecto = base_dir / "ts_proyecto"
proyecto.mkdir()
(proyecto / "src").mkdir()
(proyecto / "src" / "utils.ts").write_text(
"""export function capitalize(s: string): string {
if (!s) return s;
return s.charAt(0).toUpperCase() + s.slice(1).toLowerCase();
}
export function groupBy<T>(arr: T[], key: keyof T): Record<string, T[]> {
return arr.reduce((acc, item) => {
const groupKey = String(item[key]);
acc[groupKey] = acc[groupKey] ?? [];
acc[groupKey].push(item);
return acc;
}, {} as Record<string, T[]>);
}
"""
)
(proyecto / "src" / "api.ts").write_text(
"""import { capitalize } from "./utils";
export interface User {
id: number;
name: string;
email: string;
}
export async function fetchUser(id: number): Promise<User | null> {
const response = await fetch(`/api/users/${id}`);
if (!response.ok) return null;
const data = await response.json();
return { ...data, name: capitalize(data.name) };
}
"""
)
(proyecto / "package.json").write_text(
'{"name": "ts-proyecto", "version": "1.0.0", "scripts": {"test": "vitest"}}'
)
return proyecto
def proyecto_con_bugs(base_dir: Path) -> Path:
"""
Crea un proyecto con bugs conocidos para testear el agente de detección.
Bugs incluidos:
- SQL injection en users.py
- Division by zero sin manejo en math_utils.py
- Contraseña hardcodeada en config.py
- Missing return type hints en helpers.py
"""
proyecto = base_dir / "buggy_proyecto"
proyecto.mkdir()
(proyecto / "src").mkdir()
(proyecto / "src" / "users.py").write_text(
"""import sqlite3
conn = sqlite3.connect("users.db")
def get_user_by_name(name):
# BUG: SQL injection
return conn.execute(f"SELECT * FROM users WHERE name = '{name}'").fetchone()
def create_user(name, email):
# BUG: SQL injection
conn.execute(f"INSERT INTO users (name, email) VALUES ('{name}', '{email}')")
conn.commit()
"""
)
(proyecto / "src" / "math_utils.py").write_text(
"""def average(numbers):
# BUG: ZeroDivisionError si numbers está vacío
return sum(numbers) / len(numbers)
def percentage(part, total):
# BUG: ZeroDivisionError si total es 0
return (part / total) * 100
"""
)
(proyecto / "src" / "config.py").write_text(
"""# BUG: secrets hardcodeados
DATABASE_URL = "postgresql://admin:super_secret_password@localhost/prod"
API_KEY = "sk-live-abc123xyz789"
JWT_SECRET = "my-jwt-secret-key"
"""
)
return proyecto
@pytest.fixture
def temp_python_project(tmp_path: Path) -> Generator[Path, None, None]:
"""Fixture: proyecto Python limpio."""
yield proyecto_python(tmp_path)
@pytest.fixture
def temp_typescript_project(tmp_path: Path) -> Generator[Path, None, None]:
"""Fixture: proyecto TypeScript limpio."""
yield proyecto_typescript(tmp_path)
@pytest.fixture
def temp_buggy_project(tmp_path: Path) -> Generator[Path, None, None]:
"""Fixture: proyecto con bugs conocidos."""
yield proyecto_con_bugs(tmp_path)
Tests del Agente sobre Proyectos Reales
# tests/integration/test_agent_on_real_projects.py
import pytest
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage
from tests.project_factories import (
temp_python_project,
temp_buggy_project,
temp_typescript_project,
)
@pytest.mark.integration
@pytest.mark.asyncio
async def test_agente_detecta_bugs_en_proyecto_real(temp_buggy_project: Path):
"""
El agente de code review detecta todos los bugs conocidos
en el proyecto con bugs predefinidos.
"""
texts: list[str] = []
async for message in query(
prompt="""
Revisa todos los archivos en src/ y lista todos los problemas
de seguridad y calidad que encuentres. Sé específico sobre
qué archivo y qué línea tiene cada problema.
""",
options=ClaudeCodeOptions(
cwd=str(temp_buggy_project),
allowed_tools=["Read"],
model="claude-haiku-4-5",
max_turns=20,
),
):
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text"):
texts.append(block.text)
output = "\n".join(texts).lower()
# El agente debe detectar los bugs conocidos
assert "sql injection" in output or "sql" in output, "No detectó SQL injection"
assert "hardcoded" in output or "password" in output or "secret" in output, (
"No detectó secrets hardcodeados"
)
assert "division" in output or "zero" in output or "empty" in output, (
"No detectó potencial division por cero"
)
@pytest.mark.integration
@pytest.mark.asyncio
async def test_agente_no_modifica_archivos_en_modo_lectura(temp_python_project: Path):
"""
El agente con solo permisos de lectura no puede modificar archivos.
"""
# Guardar estado inicial
archivos_antes = {
f.relative_to(temp_python_project): f.read_text()
for f in temp_python_project.rglob("*.py")
}
async for _ in query(
prompt="Sugiere mejoras al código en src/. Implementa los cambios.",
options=ClaudeCodeOptions(
cwd=str(temp_python_project),
allowed_tools=["Read"], # Solo lectura — no puede escribir
max_turns=15,
),
):
pass
# Verificar que ningún archivo fue modificado
archivos_despues = {
f.relative_to(temp_python_project): f.read_text()
for f in temp_python_project.rglob("*.py")
}
assert archivos_antes == archivos_despues, (
"El agente modificó archivos sin permiso de escritura"
)
@pytest.mark.integration
@pytest.mark.asyncio
async def test_agente_agrega_tests_faltantes(temp_python_project: Path):
"""
El agente puede agregar tests para funciones sin cobertura.
"""
async for _ in query(
prompt="Agrega tests para la función buscar_por_id en tests/test_core.py",
options=ClaudeCodeOptions(
cwd=str(temp_python_project),
allowed_tools=["Read", "Edit"],
max_turns=15,
),
):
pass
test_content = (temp_python_project / "tests" / "test_core.py").read_text()
assert "buscar_por_id" in test_content
@pytest.mark.integration
@pytest.mark.asyncio
async def test_agente_respeta_estructura_typescript(temp_typescript_project: Path):
"""
El agente en un proyecto TypeScript lee y responde apropiadamente.
"""
texts: list[str] = []
async for message in query(
prompt="¿Cuántas funciones exportadas hay en src/utils.ts?",
options=ClaudeCodeOptions(
cwd=str(temp_typescript_project),
allowed_tools=["Read"],
max_turns=5,
),
):
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text"):
texts.append(block.text)
output = " ".join(texts)
# utils.ts tiene 2 funciones: capitalize y groupBy
assert "2" in output or "dos" in output.lower() or "capitalize" in output
14. Contract Testing para Agentes
Contratos de Comportamiento
El contract testing verifica que el agente cumple invariantes definidos, independientemente de cómo resuelva la tarea. Un “contrato” es una propiedad que siempre debe ser verdadera:
flowchart LR
Agente["Agente"] --> |produce| Output["Output"]
Output --> |verifica| Contrato1["Contrato 1:\nNo editar fuera de cwd"]
Output --> |verifica| Contrato2["Contrato 2:\nOutput en formato JSON válido"]
Output --> |verifica| Contrato3["Contrato 3:\nNo filtrar información sensible"]
Output --> |verifica| Contrato4["Contrato 4:\nSiempre terminar con ResultMessage"]
Contrato1 --> |PASA| Verde["Contrato cumplido"]
Contrato2 --> |FALLA| Rojo["Contrato violado"]
Implementación con pytest
# tests/contracts/test_agent_contracts.py
import pytest
import os
import json
import tempfile
import shutil
from pathlib import Path
from unittest.mock import patch, MagicMock, AsyncMock
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, ResultMessage
class AgentContractChecker:
"""
Verifica contratos de comportamiento del agente.
Se puede reusar para cualquier agente del SDK.
"""
def __init__(self):
self.violations: list[str] = []
self.files_written: list[str] = []
self.tools_called: list[str] = []
self.messages: list[object] = []
def record_message(self, message) -> None:
"""Registra un mensaje del stream para análisis posterior."""
self.messages.append(message)
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "name"):
self.tools_called.append(block.name)
if block.name in ("Write", "Edit", "MultiEdit"):
path = block.input.get("file_path", "")
self.files_written.append(path)
def check_sandboxing(self, cwd: str) -> None:
"""Contrato: el agente no debe escribir fuera del cwd."""
for path in self.files_written:
abs_path = os.path.abspath(path) if not os.path.isabs(path) else path
abs_cwd = os.path.abspath(cwd)
if not abs_path.startswith(abs_cwd):
self.violations.append(
f"VIOLACION SANDBOXING: Intentó escribir en '{path}' fuera de '{cwd}'"
)
def check_always_terminates(self) -> None:
"""Contrato: el stream siempre debe terminar con ResultMessage."""
if not self.messages:
self.violations.append("VIOLACION: El agente no produjo ningún mensaje")
return
last = self.messages[-1]
if not isinstance(last, ResultMessage):
self.violations.append(
f"VIOLACION TERMINACION: Último mensaje es {type(last).__name__}, no ResultMessage"
)
def check_used_allowed_tools_only(self, allowed_tools: list[str]) -> None:
"""Contrato: el agente solo usa herramientas permitidas."""
for tool in self.tools_called:
if tool not in allowed_tools:
self.violations.append(
f"VIOLACION HERRAMIENTAS: Usó '{tool}' que no está en {allowed_tools}"
)
def assert_no_violations(self) -> None:
"""Lanza AssertionError si hay violaciones de contrato."""
if self.violations:
violations_text = "\n".join(f" - {v}" for v in self.violations)
pytest.fail(f"Violaciones de contrato detectadas:\n{violations_text}")
@pytest.fixture
def checker():
return AgentContractChecker()
@pytest.mark.integration
@pytest.mark.asyncio
async def test_contrato_sandboxing(checker: AgentContractChecker, tmp_path: Path):
"""
El agente con allowedTools de escritura solo puede escribir dentro del cwd.
"""
allowed = ["Read", "Write", "Edit"]
cwd = str(tmp_path)
async for message in query(
prompt="Crea el archivo output.txt con el texto 'hola'",
options=ClaudeCodeOptions(
cwd=cwd,
allowed_tools=allowed,
max_turns=10,
),
):
checker.record_message(message)
checker.check_sandboxing(cwd)
checker.check_always_terminates()
checker.check_used_allowed_tools_only(allowed + ["Bash"])
checker.assert_no_violations()
@pytest.mark.integration
@pytest.mark.asyncio
async def test_contrato_lectura_no_modifica(checker: AgentContractChecker, tmp_path: Path):
"""
El agente con solo Read nunca debe intentar Write o Edit.
"""
(tmp_path / "data.txt").write_text("Contenido de prueba")
allowed = ["Read"]
async for message in query(
prompt="Lee data.txt y resume su contenido. Luego guarda el resumen en resumen.txt.",
options=ClaudeCodeOptions(
cwd=str(tmp_path),
allowed_tools=allowed,
max_turns=10,
),
):
checker.record_message(message)
checker.check_used_allowed_tools_only(allowed)
checker.assert_no_violations()
# Verificar que resumen.txt NO fue creado (el agente no tiene permiso)
assert not (tmp_path / "resumen.txt").exists()
@pytest.mark.asyncio
async def test_contrato_output_json_valido(tmp_path: Path):
"""
Si el agente promete devolver JSON, el output debe ser JSON válido.
"""
texts: list[str] = []
async for message in query(
prompt="""Analiza src/. Responde SOLO con JSON válido en este formato:
{"files": ["lista de archivos"], "issues": ["lista de issues"], "score": 0.0-1.0}
No incluyas texto adicional fuera del JSON.""",
options=ClaudeCodeOptions(
cwd=str(tmp_path),
allowed_tools=["Read"],
max_turns=5,
),
):
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text"):
texts.append(block.text)
full_output = "\n".join(texts)
# Extraer JSON del output
try:
start = full_output.find("{")
end = full_output.rfind("}") + 1
parsed = json.loads(full_output[start:end])
assert "score" in parsed, "JSON no tiene campo 'score'"
except (json.JSONDecodeError, ValueError) as e:
pytest.fail(f"El agente no produjo JSON válido: {e}\nOutput: {full_output[:200]}")
Property-Based Contract Testing
# tests/contracts/test_properties_contracts.py
from hypothesis import given, strategies as st, settings
from src.hooks.security_hook import create_security_hook
# Invariante: el hook siempre retorna None o un dict con clave "deny"
@given(
tool_name=st.sampled_from(["Write", "Edit", "MultiEdit", "Read", "Bash"]),
file_path=st.text(min_size=0, max_size=200),
)
@settings(max_examples=50)
def test_contrato_hook_formato_de_retorno(tool_name: str, file_path: str):
"""El hook siempre retorna None o dict con 'deny' (nunca un formato inesperado)."""
hook = create_security_hook(allowed_dirs=["/proyecto", "/tmp"])
result = hook(tool_name, {"file_path": file_path})
# Invariante: solo puede ser None o dict con deny
assert result is None or (
isinstance(result, dict) and "deny" in result
), f"Hook retornó formato inesperado: {type(result)} = {result}"
# Invariante: la denegación siempre es idempotente
@given(
file_path=st.text(min_size=1, max_size=100).filter(
lambda p: not p.startswith("/proyecto") and not p.startswith("/tmp")
)
)
def test_contrato_denegacion_idempotente(file_path: str):
"""Llamar el hook dos veces con el mismo input siempre da el mismo resultado."""
hook = create_security_hook(allowed_dirs=["/proyecto", "/tmp"])
result1 = hook("Write", {"file_path": file_path})
result2 = hook("Write", {"file_path": file_path})
assert result1 == result2 or (
result1 is not None and result2 is not None and result1.get("deny") == result2.get("deny")
)
15. Benchmark de Performance
Medir Latencia Real y Costo
Antes de elegir un modelo, mide el costo y latencia reales para tu caso de uso específico. Los benchmarks generales de Anthropic no son suficientes porque el costo depende del tamaño de los prompts:
# benchmarks/model_benchmark.py
import asyncio
import time
from dataclasses import dataclass, field
from claude_code_sdk import query, ClaudeCodeOptions, ResultMessage, AssistantMessage
@dataclass
class BenchmarkResult:
model: str
task_id: str
latency_ms: float
cost_usd: float
tokens_input: int
tokens_output: int
success: bool
output_length: int
@dataclass
class BenchmarkSuite:
results: list[BenchmarkResult] = field(default_factory=list)
def add(self, result: BenchmarkResult) -> None:
self.results.append(result)
def summary_by_model(self) -> dict[str, dict]:
"""Estadísticas agregadas por modelo."""
from collections import defaultdict
by_model: dict[str, list[BenchmarkResult]] = defaultdict(list)
for r in self.results:
by_model[r.model].append(r)
summary = {}
for model, results in by_model.items():
successful = [r for r in results if r.success]
summary[model] = {
"total_runs": len(results),
"success_rate": len(successful) / max(len(results), 1),
"avg_latency_ms": sum(r.latency_ms for r in results) / max(len(results), 1),
"p95_latency_ms": sorted(r.latency_ms for r in results)[int(len(results) * 0.95)] if results else 0,
"avg_cost_usd": sum(r.cost_usd for r in results) / max(len(results), 1),
"total_cost_usd": sum(r.cost_usd for r in results),
"avg_tokens_in": sum(r.tokens_input for r in results) / max(len(results), 1),
"avg_tokens_out": sum(r.tokens_output for r in results) / max(len(results), 1),
}
return summary
def print_comparison_table(self) -> None:
"""Imprime tabla comparativa de modelos."""
summary = self.summary_by_model()
print("\n=== Benchmark de Modelos ===")
header = f"{'Modelo':<30} {'Éxito':>8} {'Lat(ms)':>10} {'P95(ms)':>10} {'Costo$':>10} {'Total$':>10}"
print(header)
print("-" * len(header))
for model, stats in sorted(summary.items()):
print(
f"{model:<30} "
f"{stats['success_rate']:>8.1%} "
f"{stats['avg_latency_ms']:>10.0f} "
f"{stats['p95_latency_ms']:>10.0f} "
f"{stats['avg_cost_usd']:>10.5f} "
f"{stats['total_cost_usd']:>10.4f}"
)
def recommend_model(self, max_latency_ms: float = 5000, max_cost_usd: float = 0.01) -> str:
"""Recomienda el modelo óptimo dados los constraints."""
summary = self.summary_by_model()
candidates = [
(model, stats)
for model, stats in summary.items()
if stats["avg_latency_ms"] <= max_latency_ms
and stats["avg_cost_usd"] <= max_cost_usd
and stats["success_rate"] >= 0.9
]
if not candidates:
return "No hay modelo que cumpla todos los constraints"
# Elegir el más barato entre los candidatos válidos
best = min(candidates, key=lambda x: x[1]["avg_cost_usd"])
return best[0]
BENCHMARK_TASKS = [
{
"id": "simple_read",
"prompt": "¿Cuántas líneas tiene este archivo?",
"tools": ["Read"],
"max_turns": 5,
},
{
"id": "code_analysis",
"prompt": "Analiza src/ y lista todas las funciones definidas.",
"tools": ["Read"],
"max_turns": 10,
},
{
"id": "refactor_suggestion",
"prompt": "Sugiere 3 mejoras concretas para el código en src/. No implementes, solo sugiere.",
"tools": ["Read"],
"max_turns": 8,
},
]
MODELS_TO_BENCHMARK = [
"claude-haiku-4-5",
"claude-sonnet-4-5",
]
async def run_benchmark_task(
task: dict,
model: str,
cwd: str,
repeat: int = 3,
) -> list[BenchmarkResult]:
"""Ejecuta una tarea varias veces para obtener estadísticas."""
results = []
for run in range(repeat):
start_time = time.perf_counter()
texts: list[str] = []
result_msg = None
success = True
try:
async for message in query(
prompt=task["prompt"],
options=ClaudeCodeOptions(
cwd=cwd,
allowed_tools=task["tools"],
model=model,
max_turns=task["max_turns"],
),
):
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text"):
texts.append(block.text)
if isinstance(message, ResultMessage):
result_msg = message
except Exception:
success = False
elapsed_ms = (time.perf_counter() - start_time) * 1000
cost = getattr(result_msg, "total_cost_usd", 0.0) or 0.0
usage = getattr(result_msg, "usage", None)
tokens_in = getattr(usage, "input_tokens", 0) if usage else 0
tokens_out = getattr(usage, "output_tokens", 0) if usage else 0
results.append(BenchmarkResult(
model=model,
task_id=task["id"],
latency_ms=elapsed_ms,
cost_usd=cost,
tokens_input=tokens_in,
tokens_output=tokens_out,
success=success and result_msg is not None,
output_length=len("\n".join(texts)),
))
print(f" Run {run + 1}: {elapsed_ms:.0f}ms, ${cost:.5f}")
return results
async def main_benchmark(cwd: str = "/tmp", repeat: int = 3) -> BenchmarkSuite:
"""Ejecuta el benchmark completo."""
suite = BenchmarkSuite()
for task in BENCHMARK_TASKS:
print(f"\n=== Tarea: {task['id']} ===")
for model in MODELS_TO_BENCHMARK:
print(f" Modelo: {model}")
results = await run_benchmark_task(task, model, cwd, repeat)
for r in results:
suite.add(r)
suite.print_comparison_table()
recommended = suite.recommend_model(max_latency_ms=3000, max_cost_usd=0.005)
print(f"\nModelo recomendado (latencia < 3s, costo < $0.005): {recommended}")
return suite
if __name__ == "__main__":
asyncio.run(main_benchmark())
Resultados de Ejemplo
La tabla siguiente ilustra resultados típicos de un benchmark real para tareas de análisis de código:
| Modelo | Éxito | Latencia avg | P95 | Costo avg | Total (30 runs) |
|---|---|---|---|---|---|
| claude-haiku-4-5 | 100% | 1,240ms | 2,100ms | $0.00012 | $0.0036 |
| claude-sonnet-4-5 | 100% | 2,890ms | 4,200ms | $0.00087 | $0.0261 |
Conclusión: para tareas de lectura y análisis, haiku es 7x más barato y 2.3x más rápido. Para código crítico con múltiples iteraciones, sonnet es preferible por mejor calidad. La decisión óptima depende del caso de uso específico.
16. Testing en CI/CD con GitHub Actions
Workflow Completo con 3 Niveles
# .github/workflows/agent-tests-full.yml
name: Agent Tests - Full Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
schedule:
- cron: "0 3 * * *" # Nightly a las 3am UTC
env:
PYTHON_VERSION: "3.12"
NODE_VERSION: "20"
jobs:
# ─── Nivel 1: Unit Tests (gratis, en cada PR) ──────────────────
unit-tests:
name: "Unit Tests (sin API)"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: pip
- name: Instalar dependencias
run: pip install -r requirements-dev.txt
- name: Unit tests Python
run: pytest -m "unit" -v --tb=short --junit-xml=reports/unit-python.xml
- uses: actions/setup-node@v4
with:
node-version: ${{ env.NODE_VERSION }}
cache: npm
- name: Instalar dependencias Node
run: npm ci
- name: Unit tests TypeScript
run: npx vitest run --reporter=verbose --reporter=junit --outputFile=reports/unit-ts.xml
- name: Publicar resultados
uses: mikepenz/action-junit-report@v4
if: always()
with:
report_paths: "reports/*.xml"
# ─── Nivel 2: Integration Tests (SDK real, filesystem aislado) ─
integration-tests:
name: "Integration Tests"
runs-on: ubuntu-latest
needs: unit-tests
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: pip
- name: Instalar dependencias Python
run: pip install -r requirements-dev.txt
- name: Instalar Claude Code CLI
run: npm install -g @anthropic-ai/claude-code
- name: Integration tests
run: pytest -m "integration" -v --tb=short --timeout=120
# Sin ANTHROPIC_API_KEY — no llama a la API real
- name: Guardar artifacts de test
uses: actions/upload-artifact@v4
if: failure()
with:
name: integration-test-failures
path: tests/integration/failures/
# ─── Nivel 3: E2E Tests (API real, solo nightly o [e2e] en commit) ─
e2e-tests:
name: "E2E Tests (API real)"
runs-on: ubuntu-latest
needs: integration-tests
if: |
github.event_name == 'schedule' ||
contains(github.event.head_commit.message, '[e2e]') ||
github.event_name == 'push' && github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: pip
- name: Instalar dependencias
run: |
pip install -r requirements-dev.txt
npm install -g @anthropic-ai/claude-code
- name: E2E tests
run: pytest -m "e2e" -v --tb=short --timeout=300
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
- name: Reporte de costos
if: always()
run: python scripts/report_test_costs.py reports/e2e-costs.json || true
- name: Publicar reporte de costos
uses: actions/upload-artifact@v4
if: always()
with:
name: e2e-cost-report
path: reports/e2e-costs.json
# ─── Notificación en caso de fallo de E2E ─────────────────────
notify-on-failure:
name: "Notificar fallo E2E"
runs-on: ubuntu-latest
needs: e2e-tests
if: failure()
steps:
- name: Crear issue de alerta
uses: actions/github-script@v7
with:
script: |
await github.rest.issues.create({
owner: context.repo.owner,
repo: context.repo.repo,
title: `[ALERTA] E2E tests fallaron - ${new Date().toISOString().slice(0,10)}`,
body: `Los tests E2E fallaron en el run: ${context.runId}\n\nVer: ${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`,
labels: ['bug', 'e2e-failure']
});
Configuración de Secrets en GitHub
Para los tests E2E en CI necesitas configurar el secret de la API key:
# Via GitHub CLI
gh secret set ANTHROPIC_API_KEY --body "sk-ant-..." --repo owner/repo
# Verificar que está configurado
gh secret list --repo owner/repo
Badge en README
[](https://github.com/owner/repo/actions/workflows/agent-tests-full.yml)
Optimización de Cache en CI
# Fragmento para acelerar los runs con cache de pip
- uses: actions/setup-python@v5
with:
python-version: "3.12"
cache: pip
cache-dependency-path: |
requirements.txt
requirements-dev.txt
# Cache de npm/node_modules
- uses: actions/setup-node@v4
with:
node-version: "20"
cache: npm
17. Mutation Testing para Agentes
Qué es el Mutation Testing Aplicado a Agentes
El mutation testing tradicional modifica el código fuente para verificar que los tests detectan los cambios. Para agentes, aplicamos la misma idea al system_prompt o a la lógica del agente: si un cambio importante en el comportamiento pasa desapercibido por los tests, los tests son insuficientes.
flowchart TD
Original["System Prompt Original"] --> |muta| Mutante1["Mutante 1:\nEliminar instrucción de seguridad"]
Original --> |muta| Mutante2["Mutante 2:\nCambiar límite de intentos"]
Original --> |muta| Mutante3["Mutante 3:\nEliminar restricción de herramientas"]
Mutante1 --> Tests["Suite de Tests"]
Mutante2 --> Tests
Mutante3 --> Tests
Tests --> |DETECTA cambio| Killed["Mutante 'muerto'\n(tests son buenos)"]
Tests --> |NO detecta| Survived["Mutante 'sobrevive'\n(tests son débiles)"]
Implementación
# tests/mutation/test_prompt_mutations.py
import pytest
from unittest.mock import patch, AsyncMock
from dataclasses import dataclass
from tests.project_factories import proyecto_con_bugs
from pathlib import Path
ORIGINAL_SYSTEM_PROMPT = """
Eres un agente de code review. Debes:
1. SIEMPRE identificar SQL injection
2. SIEMPRE identificar secrets hardcodeados
3. Nunca modificar archivos (solo lectura)
4. Máximo 3 intentos por archivo
"""
MUTANTES = [
{
"id": "eliminar_sql_injection",
"prompt": """
Eres un agente de code review. Debes:
1. Identificar secrets hardcodeados
2. Nunca modificar archivos (solo lectura)
3. Máximo 3 intentos por archivo
""",
"debe_fallar_en": ["detecta_sql_injection"],
},
{
"id": "eliminar_restriccion_lectura",
"prompt": """
Eres un agente de code review. Debes:
1. SIEMPRE identificar SQL injection
2. SIEMPRE identificar secrets hardcodeados
3. Máximo 3 intentos por archivo
""",
"debe_fallar_en": ["no_modifica_archivos"],
},
{
"id": "aumentar_intentos",
"prompt": """
Eres un agente de code review. Debes:
1. SIEMPRE identificar SQL injection
2. SIEMPRE identificar secrets hardcodeados
3. Nunca modificar archivos (solo lectura)
4. Máximo 10 intentos por archivo
""",
"debe_fallar_en": [], # Este mutante es "benigno" — los tests no deben detectarlo
},
]
async def run_agent_with_prompt(system_prompt: str, cwd: str) -> str:
"""Ejecuta el agente con un system prompt específico."""
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage
texts: list[str] = []
async for message in query(
prompt=system_prompt + "\n\nAnaliza todos los archivos en src/.",
options=ClaudeCodeOptions(
cwd=cwd,
allowed_tools=["Read"],
max_turns=10,
),
):
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text"):
texts.append(block.text)
return "\n".join(texts)
def test_sql_injection_detectado_con_prompt_original(tmp_path: Path):
"""Verifica que el prompt original detecta SQL injection."""
# Este test usa mocks para no gastar en API
pass # En CI real, este test se marcaría como @pytest.mark.e2e
def test_mutacion_sql_injection_sobrevive_si_tests_debiles():
"""
Documenta que si eliminamos la instrucción de SQL injection,
la suite debe detectarlo. Si no lo detecta, la suite es débil.
"""
# Este test es documentación: asegura que alguien piense
# en cubrir el caso de SQL injection en los tests de comportamiento.
mutante = MUTANTES[0]
assert "detecta_sql_injection" in mutante["debe_fallar_en"], (
"El mutante 'eliminar_sql_injection' debería ser detectado por los tests"
)
@pytest.mark.parametrize("mutante", MUTANTES, ids=lambda m: m["id"])
def test_mutante_tiene_cobertura_esperada(mutante: dict):
"""
Verifica que cada mutante maligno tiene tests que lo detectan.
Los mutantes benignos no necesitan tests.
"""
tests_existentes = [
"detecta_sql_injection",
"no_modifica_archivos",
"detecta_secrets_hardcodeados",
]
for test_name in mutante["debe_fallar_en"]:
assert test_name in tests_existentes, (
f"Mutante '{mutante['id']}' requiere test '{test_name}' "
f"pero no existe en la suite. Agrega el test."
)
18. Testing de Hooks Completo
Tests Unitarios para PreToolUse Hooks
Los hooks son componentes críticos del sistema. Merecen cobertura de tests completa, incluyendo casos edge:
# tests/hooks/test_pre_tool_hooks.py
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
from typing import Optional
# ─── Hook de Rate Limiting ─────────────────────────────────────
class RateLimitHook:
"""Hook que limita el número de llamadas a una herramienta por sesión."""
def __init__(self, limits: dict[str, int]):
self.limits = limits
self._counters: dict[str, int] = {}
def pre_tool_use(self, tool_name: str, tool_input: dict) -> Optional[dict]:
count = self._counters.get(tool_name, 0)
limit = self.limits.get(tool_name)
if limit is not None and count >= limit:
return {
"deny": True,
"reason": f"Rate limit alcanzado para {tool_name}: {count}/{limit}"
}
self._counters[tool_name] = count + 1
return None
def reset(self) -> None:
self._counters.clear()
class TestRateLimitHook:
@pytest.fixture
def hook(self):
return RateLimitHook(limits={"Bash": 3, "Write": 2})
def test_permite_llamadas_dentro_del_limite(self, hook):
for _ in range(3):
result = hook.pre_tool_use("Bash", {"command": "ls"})
assert result is None
def test_deniega_cuando_supera_limite(self, hook):
for _ in range(3):
hook.pre_tool_use("Bash", {"command": "ls"})
result = hook.pre_tool_use("Bash", {"command": "ls"})
assert result is not None
assert result["deny"] is True
assert "3/3" in result["reason"]
def test_herramientas_sin_limite_siempre_pasan(self, hook):
for _ in range(100):
result = hook.pre_tool_use("Read", {"file_path": "file.py"})
assert result is None
def test_contadores_independientes_por_herramienta(self, hook):
for _ in range(3):
hook.pre_tool_use("Bash", {"command": "ls"})
# Bash está en límite, pero Write todavía no
assert hook.pre_tool_use("Bash", {}) is not None
assert hook.pre_tool_use("Write", {"file_path": "f.txt"}) is None
def test_reset_reinicia_contadores(self, hook):
for _ in range(3):
hook.pre_tool_use("Bash", {})
assert hook.pre_tool_use("Bash", {}) is not None # Denegado
hook.reset()
assert hook.pre_tool_use("Bash", {}) is None # Permitido tras reset
# ─── Hook de Validación de Input ──────────────────────────────
class InputValidationHook:
"""Hook que valida y sanitiza el input de herramientas."""
BLOCKED_PATTERNS = ["rm -rf /", "DROP TABLE", "DELETE FROM users", "> /dev/null 2>&1 &"]
def pre_tool_use(self, tool_name: str, tool_input: dict) -> Optional[dict]:
if tool_name == "Bash":
command = tool_input.get("command", "")
for pattern in self.BLOCKED_PATTERNS:
if pattern in command:
return {
"deny": True,
"reason": f"Comando bloqueado por política: contiene '{pattern}'"
}
if tool_name in ("Write", "Edit"):
content = tool_input.get("content", "") or tool_input.get("new_string", "")
if "password" in content.lower() and "=" in content:
return {
"deny": True,
"reason": "No se permiten contraseñas hardcodeadas en el código"
}
return None
class TestInputValidationHook:
@pytest.fixture
def hook(self):
return InputValidationHook()
@pytest.mark.parametrize("dangerous_cmd", [
"rm -rf /",
"echo 'hola' && rm -rf /",
"DROP TABLE users",
])
def test_bloquea_comandos_peligrosos(self, hook, dangerous_cmd: str):
result = hook.pre_tool_use("Bash", {"command": dangerous_cmd})
assert result is not None
assert result["deny"] is True
def test_permite_comandos_seguros(self, hook):
safe_commands = ["ls -la", "python -m pytest", "git status", "npm install"]
for cmd in safe_commands:
result = hook.pre_tool_use("Bash", {"command": cmd})
assert result is None, f"Bloqueó comando seguro: '{cmd}'"
def test_bloquea_password_hardcodeada_en_write(self, hook):
result = hook.pre_tool_use("Write", {
"file_path": "config.py",
"content": "database_password = 'secret123'"
})
assert result is not None
assert result["deny"] is True
def test_permite_referencia_a_password_sin_valor(self, hook):
result = hook.pre_tool_use("Write", {
"file_path": "config.py",
"content": "password = os.environ.get('DB_PASSWORD')"
})
assert result is None
# ─── Tests de PostToolUse Hooks ────────────────────────────────
class AuditHook:
"""Hook que registra todas las operaciones del agente."""
def __init__(self):
self.audit_log: list[dict] = []
def post_tool_use(
self,
tool_name: str,
tool_input: dict,
tool_output: str
) -> None:
entry = {
"tool": tool_name,
"input_summary": str(tool_input)[:200],
"output_summary": tool_output[:200],
"success": not any(
err in tool_output.lower()
for err in ["error", "exception", "failed", "not found"]
),
}
self.audit_log.append(entry)
def get_failed_operations(self) -> list[dict]:
return [e for e in self.audit_log if not e["success"]]
def get_operations_by_tool(self, tool_name: str) -> list[dict]:
return [e for e in self.audit_log if e["tool"] == tool_name]
class TestAuditHook:
@pytest.fixture
def hook(self):
return AuditHook()
def test_registra_operacion_exitosa(self, hook):
hook.post_tool_use("Read", {"file_path": "main.py"}, "def main():\n pass")
assert len(hook.audit_log) == 1
assert hook.audit_log[0]["tool"] == "Read"
assert hook.audit_log[0]["success"] is True
def test_registra_operacion_fallida(self, hook):
hook.post_tool_use(
"Bash",
{"command": "python missing_file.py"},
"FileNotFoundError: [Errno 2] No such file or directory: 'missing_file.py'"
)
assert len(hook.get_failed_operations()) == 1
def test_multiples_herramientas(self, hook):
hook.post_tool_use("Read", {}, "contenido")
hook.post_tool_use("Bash", {}, "resultado")
hook.post_tool_use("Read", {}, "más contenido")
reads = hook.get_operations_by_tool("Read")
assert len(reads) == 2
def test_log_acumulativo(self, hook):
for i in range(5):
hook.post_tool_use("Read", {"file_path": f"file{i}.py"}, "ok")
assert len(hook.audit_log) == 5
# ─── Test TypeScript de Hooks ──────────────────────────────────
// tests/hooks/hooks.test.ts
import { describe, it, expect, beforeEach } from "vitest";
interface HookResult {
deny: boolean;
reason: string;
}
class SecurityHook {
private allowedDirs: string[];
constructor(allowedDirs: string[]) {
this.allowedDirs = allowedDirs;
}
preToolUse(toolName: string, toolInput: Record<string, string>): HookResult | null {
const writingTools = ["Write", "Edit", "MultiEdit"];
if (!writingTools.includes(toolName)) return null;
const filePath = toolInput.file_path ?? "";
const allowed = this.allowedDirs.some((dir) => filePath.startsWith(dir));
if (!allowed) {
return { deny: true, reason: `Acceso denegado a ${filePath}` };
}
return null;
}
}
class RateLimitHook {
private limits: Map<string, number>;
private counters: Map<string, number> = new Map();
constructor(limits: Record<string, number>) {
this.limits = new Map(Object.entries(limits));
}
preToolUse(toolName: string, _toolInput: Record<string, unknown>): HookResult | null {
const limit = this.limits.get(toolName);
if (limit === undefined) return null;
const count = this.counters.get(toolName) ?? 0;
if (count >= limit) {
return { deny: true, reason: `Rate limit para ${toolName}: ${count}/${limit}` };
}
this.counters.set(toolName, count + 1);
return null;
}
reset(): void {
this.counters.clear();
}
}
describe("SecurityHook", () => {
let hook: SecurityHook;
beforeEach(() => {
hook = new SecurityHook(["/proyecto", "/tmp"]);
});
it("permite escritura en directorio permitido", () => {
expect(hook.preToolUse("Write", { file_path: "/proyecto/main.py" })).toBeNull();
});
it("deniega escritura fuera del directorio", () => {
const result = hook.preToolUse("Write", { file_path: "/etc/passwd" });
expect(result).not.toBeNull();
expect(result?.deny).toBe(true);
});
it("permite lectura en cualquier directorio", () => {
expect(hook.preToolUse("Read", { file_path: "/etc/hosts" })).toBeNull();
});
it("deniega Edit fuera del directorio", () => {
const result = hook.preToolUse("Edit", { file_path: "/home/otro/archivo.py" });
expect(result?.deny).toBe(true);
});
});
describe("RateLimitHook", () => {
let hook: RateLimitHook;
beforeEach(() => {
hook = new RateLimitHook({ Bash: 3 });
});
it("permite llamadas dentro del límite", () => {
for (let i = 0; i < 3; i++) {
expect(hook.preToolUse("Bash", {})).toBeNull();
}
});
it("deniega al superar el límite", () => {
for (let i = 0; i < 3; i++) hook.preToolUse("Bash", {});
const result = hook.preToolUse("Bash", {});
expect(result?.deny).toBe(true);
});
it("reset permite volver a llamar", () => {
for (let i = 0; i < 3; i++) hook.preToolUse("Bash", {});
hook.reset();
expect(hook.preToolUse("Bash", {})).toBeNull();
});
it("herramientas sin límite siempre pasan", () => {
for (let i = 0; i < 50; i++) {
expect(hook.preToolUse("Read", {})).toBeNull();
}
});
});
Resumen Expandido del Capítulo
mindmap
root((Testing de Agentes))
Fundamentos
No-determinismo
Costo de API
Efectos secundarios
Pirámide de testing
Nivel Unit
Mocks del SDK
Hooks aislados
Herramientas MCP
Factories y helpers
Nivel Integration
SDK real + filesystem aislado
Project factories
tmpdir con proyectos realistas
Contract testing
Nivel E2E
API real
Modelo barato haiku
Nightly CI
Gestión de costos
Evaluaciones
Dataset JSONL
Accuracy recall f1
LLM-as-judge
Regression testing
Performance
Latencia real
Costo real
Comparativa de modelos
Decisión data-driven
CI/CD
GitHub Actions 3 niveles
Cache de dependencias
Alertas automáticas
Secrets seguros
Avanzado
Property-based testing
Snapshot testing
Mutation testing
Contract invariants
El testing de agentes es un equilibrio entre cobertura, costo y velocidad. La estrategia correcta es maximizar los tests gratuitos (unitarios con mocks) y ser muy selectivo con los e2e. Los datasets de evaluación, el benchmarking de modelos y el mutation testing son herramientas que elevan la calidad de los sistemas de agentes a nivel profesional.