Capítulo 9: Casos de Uso Avanzados
Capítulo 9: Casos de Uso Avanzados
Este capítulo presenta implementaciones completas y funcionales de agentes reales. Cada caso de uso incluye código Python y TypeScript, manejo de errores, y consideraciones de producción.
1. Code Review Automático
Un agente que analiza Pull Requests, detecta bugs, problemas de seguridad y estilo, y genera un reporte detallado en Markdown.
flowchart TD
PR[Pull Request / Diff]
PR --> F[Fetch archivos modificados]
F --> P1[Worker: Análisis de bugs]
F --> P2[Worker: Seguridad]
F --> P3[Worker: Estilo y convenciones]
P1 --> R[Reporte consolidado]
P2 --> R
P3 --> R
R --> CM[Comentario en el PR]
import asyncio
import subprocess
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions
from dataclasses import dataclass
from typing import List
@dataclass
class HallazgoReview:
tipo: str # bug, security, style, performance
severidad: str # critical, high, medium, low
archivo: str
linea: int
descripcion: str
sugerencia: str
@dataclass
class ResultadoCodeReview:
hallazgos: List[HallazgoReview]
resumen: str
puntuacion: int # 0-100
aprobado: bool
reporte_markdown: str
def obtener_archivos_modificados(repo_path: str, base_branch: str = "main") -> List[str]:
"""Obtiene la lista de archivos modificados respecto a la rama base."""
try:
resultado = subprocess.run(
["git", "diff", "--name-only", f"origin/{base_branch}...HEAD"],
cwd=repo_path,
capture_output=True,
text=True,
check=True
)
archivos = [a.strip() for a in resultado.stdout.strip().split('\n') if a.strip()]
return [a for a in archivos if a.endswith(('.py', '.ts', '.js', '.tsx', '.jsx'))]
except subprocess.CalledProcessError:
return []
async def revisar_archivo_bugs(archivo: str, contenido: str) -> str:
"""Revisa un archivo buscando bugs y problemas lógicos."""
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=[],
system_prompt="""Eres un revisor de código experto en detectar bugs.
Analiza el código y detecta:
- Null pointer / undefined references
- Condiciones de carrera (race conditions)
- Errores de manejo de errores (excepciones no capturadas)
- Lógica incorrecta o edge cases no manejados
- Memory leaks
- Problemas de concurrencia
Retorna JSON:
[{"tipo": "bug", "severidad": "critical|high|medium|low", "linea": N, "descripcion": "...", "sugerencia": "..."}]
Si no hay bugs: []""",
max_turns=3,
)
resultado = ""
async for m in query(
prompt=f"Revisa este archivo '{archivo}' buscando bugs:\n\n```\n{contenido[:4000]}\n```",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
return resultado
async def revisar_archivo_seguridad(archivo: str, contenido: str) -> str:
"""Revisa un archivo buscando vulnerabilidades de seguridad."""
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=[],
system_prompt="""Eres un experto en seguridad de aplicaciones.
Detecta vulnerabilidades OWASP Top 10:
- SQL Injection (A03)
- XSS (A03)
- Broken Authentication (A07)
- Sensitive Data Exposure (A02)
- Injection (A03)
- Secrets hardcodeados
- Uso inseguro de eval/exec
- Path traversal
Retorna JSON:
[{"tipo": "security", "severidad": "critical|high|medium|low", "linea": N, "descripcion": "...", "sugerencia": "..."}]
Si no hay problemas: []""",
max_turns=3,
)
resultado = ""
async for m in query(
prompt=f"Analiza la seguridad de '{archivo}':\n\n```\n{contenido[:4000]}\n```",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
return resultado
async def revisar_archivo_estilo(archivo: str, contenido: str, lenguaje: str) -> str:
"""Revisa un archivo por convenciones de estilo."""
opciones = ClaudeCodeOptions(
model="claude-haiku-4-5",
allowed_tools=[],
system_prompt=f"""Eres un revisor de estilo de código {lenguaje}.
Detecta violaciones de:
- Convenciones de nombres (PEP8 para Python, camelCase/PascalCase para TS)
- Funciones demasiado largas (>30 líneas)
- Complejidad ciclomática alta
- Código duplicado
- Falta de type hints/tipos
- Comentarios innecesarios o faltantes donde importan
Retorna JSON:
[{"tipo": "style", "severidad": "low|medium", "linea": N, "descripcion": "...", "sugerencia": "..."}]
Si el código está limpio: []""",
max_turns=3,
)
resultado = ""
async for m in query(
prompt=f"Revisa el estilo de '{archivo}':\n\n```{lenguaje}\n{contenido[:4000]}\n```",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
return resultado
async def generar_reporte_review(
archivos_revisados: int,
todos_hallazgos: List[dict],
repo_path: str
) -> str:
"""Genera el reporte final de code review en Markdown."""
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=[],
system_prompt="""Generas reportes de code review profesionales en Markdown.
El reporte debe incluir:
- Resumen ejecutivo con puntuación (0-100)
- Tabla de hallazgos por severidad
- Sección por archivo con hallazgos detallados
- Recomendaciones ordenadas por prioridad
- Conclusión: APROBADO / NECESITA CAMBIOS
Usa emojis apropiados: 🔴 critical, 🟠 high, 🟡 medium, 🟢 low, ✅ aprobado, ❌ requiere cambios""",
max_turns=5,
)
import json
hallazgos_texto = json.dumps(todos_hallazgos, ensure_ascii=False, indent=2)
resultado = ""
async for m in query(
prompt=f"Genera el reporte de code review:\nArchivos revisados: {archivos_revisados}\nHallazgos:\n{hallazgos_texto[:5000]}",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
return resultado
async def code_review_completo(repo_path: str, base_branch: str = "main") -> ResultadoCodeReview:
"""
Ejecuta un code review completo del PR.
Analiza bugs, seguridad y estilo en paralelo por archivo.
"""
import json
archivos = obtener_archivos_modificados(repo_path, base_branch)
if not archivos:
archivos = list(Path(repo_path).glob("*.py"))[:5] # Fallback para demo
archivos = [str(a) for a in archivos]
print(f"[Code Review] Revisando {len(archivos)} archivos...")
todos_hallazgos = []
async def revisar_archivo_completo(ruta: str) -> list:
"""Revisa un archivo desde todas las perspectivas."""
archivo = Path(repo_path) / ruta if not ruta.startswith('/') else Path(ruta)
try:
contenido = archivo.read_text(encoding='utf-8')
except (IOError, UnicodeDecodeError):
return []
extension = archivo.suffix.lower()
lenguaje = {"py": "Python", ".ts": "TypeScript", ".js": "JavaScript"}.get(extension, "código")
# Ejecutar los tres tipos de revisión en paralelo
bugs_raw, seguridad_raw, estilo_raw = await asyncio.gather(
revisar_archivo_bugs(str(ruta), contenido),
revisar_archivo_seguridad(str(ruta), contenido),
revisar_archivo_estilo(str(ruta), contenido, lenguaje),
)
hallazgos_archivo = []
for raw in [bugs_raw, seguridad_raw, estilo_raw]:
try:
inicio = raw.find('[')
fin = raw.rfind(']') + 1
if inicio >= 0:
items = json.loads(raw[inicio:fin])
for item in items:
item['archivo'] = str(ruta)
hallazgos_archivo.append(item)
except (json.JSONDecodeError, ValueError):
pass
return hallazgos_archivo
# Revisar todos los archivos con semáforo para controlar concurrencia
semaforo = asyncio.Semaphore(3)
async def revisar_con_limite(ruta):
async with semaforo:
return await revisar_archivo_completo(ruta)
tareas = [revisar_con_limite(a) for a in archivos]
resultados = await asyncio.gather(*tareas)
for hallazgos_archivo in resultados:
todos_hallazgos.extend(hallazgos_archivo)
# Generar reporte
reporte = await generar_reporte_review(len(archivos), todos_hallazgos, repo_path)
# Calcular métricas
criticos = sum(1 for h in todos_hallazgos if h.get('severidad') == 'critical')
altos = sum(1 for h in todos_hallazgos if h.get('severidad') == 'high')
puntuacion = max(0, 100 - criticos * 20 - altos * 10)
aprobado = criticos == 0 and altos <= 2
hallazgos_tipados = [
HallazgoReview(
tipo=h.get('tipo', 'style'),
severidad=h.get('severidad', 'low'),
archivo=h.get('archivo', 'unknown'),
linea=h.get('linea', 0),
descripcion=h.get('descripcion', ''),
sugerencia=h.get('sugerencia', '')
)
for h in todos_hallazgos
]
return ResultadoCodeReview(
hallazgos=hallazgos_tipados,
resumen=f"{len(archivos)} archivos, {len(todos_hallazgos)} hallazgos ({criticos} críticos)",
puntuacion=puntuacion,
aprobado=aprobado,
reporte_markdown=reporte
)
# Uso
async def main():
review = await code_review_completo("/mi/proyecto")
print(f"Puntuación: {review.puntuacion}/100")
print(f"Estado: {'APROBADO' if review.aprobado else 'NECESITA CAMBIOS'}")
print(review.reporte_markdown)
asyncio.run(main())
2. Generador de Tests Automático
Analiza código, genera tests unitarios completos, los ejecuta y los itera hasta que pasen.
flowchart TD
A[Código fuente a testear]
A --> B[Analizar firma y lógica]
B --> C[Generar tests unitarios]
C --> D[Ejecutar tests]
D --> E{¿Pasan?}
E -->|Sí| F[Guardar tests]
E -->|No — max 3 intentos| G[Analizar error]
G --> H[Regenerar tests corregidos]
H --> D
E -->|Intentos agotados| I[Guardar con marcadores de TODO]
import asyncio
import subprocess
import tempfile
import os
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions
async def analizar_codigo_para_tests(archivo: str, contenido: str) -> str:
"""Analiza el código y describe qué necesita ser testeado."""
opciones = ClaudeCodeOptions(
model="claude-haiku-4-5",
allowed_tools=[],
system_prompt="""Analiza el código y describe:
1. Todas las funciones/métodos públicos
2. Sus parámetros y tipos
3. Casos happy path
4. Casos edge (None, vacío, valores límite)
5. Casos de error esperados
Retorna JSON:
{
"funciones": [
{
"nombre": "...",
"descripcion": "...",
"parametros": [{"nombre": "...", "tipo": "...", "descripcion": "..."}],
"retorna": "...",
"casos_test": ["caso1", "caso2"],
"casos_error": ["error1"]
}
]
}""",
max_turns=3,
)
resultado = ""
async for m in query(
prompt=f"Analiza estas funciones para generar tests:\n\n```python\n{contenido}\n```",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
return resultado
async def generar_tests_unitarios(
archivo_fuente: str,
contenido_fuente: str,
analisis: str,
intento: int = 1,
errores_previos: str = ""
) -> str:
"""Genera tests unitarios completos usando pytest."""
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=[],
system_prompt="""Eres un experto en testing con pytest.
Generas tests unitarios completos, bien estructurados y que REALMENTE FUNCIONAN.
REGLAS:
- Usa pytest fixtures cuando sea apropiado
- Cubre happy path, edge cases y errores
- Usa pytest.raises para excepciones esperadas
- Mockea dependencias externas (base de datos, HTTP, archivos) con unittest.mock
- Cada test tiene un nombre descriptivo: test_funcion_cuando_condicion_entonces_resultado
- Agrega docstrings a tests complejos
- NO importes nada que no sea necesario
Retorna SOLO el código Python del archivo de tests, sin explicaciones.""",
max_turns=5,
)
prompt = f"""Genera un archivo completo de tests para este código:
## Archivo fuente: {archivo_fuente}
```python
{contenido_fuente}
Análisis de funciones:
{analisis} """
if errores_previos:
prompt += f"\n## ERRORES DEL INTENTO ANTERIOR (intento {intento-1}):\n{errores_previos}\n\nCorrige estos errores en los tests."
resultado = ""
async for m in query(prompt=prompt, options=opciones):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
# Extraer solo el código Python del resultado
if "```python" in resultado:
inicio = resultado.find("```python") + 9
fin = resultado.find("```", inicio)
resultado = resultado[inicio:fin].strip()
elif "```" in resultado:
inicio = resultado.find("```") + 3
fin = resultado.find("```", inicio)
resultado = resultado[inicio:fin].strip()
return resultado
def ejecutar_tests(archivo_tests: str, directorio: str) -> tuple[bool, str]: """Ejecuta los tests con pytest y retorna (exito, output).""" try: resultado = subprocess.run( [“python”, “-m”, “pytest”, archivo_tests, “-v”, “—tb=short”, “—no-header”], cwd=directorio, capture_output=True, text=True, timeout=30 ) exito = resultado.returncode == 0 output = resultado.stdout + resultado.stderr return exito, output except subprocess.TimeoutExpired: return False, “Tests agotaron el timeout de 30s” except FileNotFoundError: return False, “pytest no está instalado”
async def generar_tests_con_iteracion( archivo_fuente: str, max_intentos: int = 3 ) -> dict: """ Genera tests, los ejecuta y los itera hasta que pasen. Retorna los tests finales y el estado. """ archivo = Path(archivo_fuente) if not archivo.exists(): return {“exito”: False, “error”: f”Archivo no encontrado: {archivo_fuente}“}
contenido_fuente = archivo.read_text(encoding='utf-8')
directorio = str(archivo.parent)
print(f"[Tests] Analizando: {archivo.name}")
analisis = await analizar_codigo_para_tests(str(archivo), contenido_fuente)
errores_previos = ""
codigo_tests = ""
for intento in range(1, max_intentos + 1):
print(f"[Tests] Generando tests — intento {intento}/{max_intentos}")
codigo_tests = await generar_tests_unitarios(
str(archivo),
contenido_fuente,
analisis,
intento,
errores_previos
)
if not codigo_tests.strip():
print(f"[Tests] No se generó código en intento {intento}")
continue
# Crear archivo temporal de tests
archivo_tests = archivo.parent / f"test_{archivo.stem}_gen.py"
archivo_tests.write_text(codigo_tests, encoding='utf-8')
print(f"[Tests] Ejecutando tests...")
exito, output = ejecutar_tests(str(archivo_tests), directorio)
if exito:
print(f"[Tests] ¡Todos los tests pasan en intento {intento}!")
return {
"exito": True,
"intentos": intento,
"archivo_tests": str(archivo_tests),
"codigo_tests": codigo_tests,
"output": output,
}
else:
print(f"[Tests] Fallaron en intento {intento}. Iterando...")
# Extraer errores para el siguiente intento
lineas_error = [
l for l in output.split('\n')
if 'ERROR' in l or 'FAILED' in l or 'error' in l.lower()
]
errores_previos = '\n'.join(lineas_error[:20])
archivo_tests.unlink() # Limpiar archivo fallido
# Guardar con TODOs si no se logró que pasen
archivo_tests_final = archivo.parent / f"test_{archivo.stem}_gen.py"
codigo_con_todos = f"# TODO: Revisar estos tests — no pasaron después de {max_intentos} intentos\n# Último error:\n# {errores_previos[:200].replace(chr(10), chr(10)+'# ')}\n\n{codigo_tests}"
archivo_tests_final.write_text(codigo_con_todos, encoding='utf-8')
return {
"exito": False,
"intentos": max_intentos,
"archivo_tests": str(archivo_tests_final),
"codigo_tests": codigo_con_todos,
"mensaje": f"Tests generados pero no pasan después de {max_intentos} intentos",
}
Uso
async def generar_tests_proyecto(directorio: str): """Genera tests para todos los archivos Python de un proyecto.""" archivos = [ str(p) for p in Path(directorio).glob(”**/*.py”) if not any(excl in str(p) for excl in [“test_”, “pycache”, “.venv”]) ]
print(f"[Tests] Generando tests para {len(archivos)} archivos...")
for archivo in archivos:
resultado = await generar_tests_con_iteracion(archivo)
estado = "OK" if resultado["exito"] else "INCOMPLETO"
print(f"[Tests] {estado}: {Path(archivo).name} ({resultado.get('intentos', 0)} intentos)")
asyncio.run(generar_tests_proyecto(“/mi/proyecto/src”))
---
## 3. Agente de Migración de Framework
Migra código de un framework a otro manteniendo la funcionalidad.
```python
import asyncio
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions
from typing import Dict
ESTRATEGIAS_MIGRACION = {
"flask_to_fastapi": {
"descripcion": "Migrar Flask a FastAPI",
"system_prompt": """Eres un experto en migración de Flask a FastAPI.
Reglas de migración:
- @app.route -> @app.get/@app.post/@app.put/@app.delete
- request.json -> parámetros del cuerpo con Pydantic
- jsonify({}) -> retornar dict directamente o Pydantic model
- request.args -> Query parameters tipados
- Agregar type hints a TODAS las funciones
- Usar async def para endpoints I/O-bound
- Mantener TODA la funcionalidad existente
Retorna SOLO el código Python migrado, sin explicaciones."""
},
"python2_to_python3": {
"descripcion": "Migrar Python 2 a Python 3",
"system_prompt": """Eres un experto en migración de Python 2 a Python 3.
Migraciones automáticas:
- print statement -> print() function
- unicode/str/bytes -> str/bytes unificados
- xrange() -> range()
- dict.iteritems() -> dict.items()
- raw_input() -> input()
- // para división entera explícita
- except ExceptionType, var -> except ExceptionType as var
- raise Exception, msg -> raise Exception(msg)
- Remover __future__ imports obsoletos
Retorna SOLO el código Python 3 migrado."""
},
"jquery_to_vanilla": {
"descripcion": "Migrar jQuery a JavaScript moderno",
"system_prompt": """Eres un experto en migración de jQuery a JavaScript moderno (ES2020+).
Equivalencias:
- $(selector) -> document.querySelector(selector)
- $.ajax() -> fetch()
- $.each() -> Array.forEach() / for...of
- $(el).addClass() -> el.classList.add()
- $(el).on() -> el.addEventListener()
- $(el).css() -> el.style.property = value
- $.extend() -> Object.assign() o spread operator
- $.Deferred() -> Promise
Retorna SOLO el código JS moderno."""
}
}
async def migrar_archivo(
archivo: str,
estrategia: str,
max_intentos: int = 2
) -> dict:
"""Migra un archivo usando la estrategia especificada."""
config = ESTRATEGIAS_MIGRACION.get(estrategia)
if not config:
return {"exito": False, "error": f"Estrategia desconocida: {estrategia}"}
ruta = Path(archivo)
if not ruta.exists():
return {"exito": False, "error": "Archivo no encontrado"}
contenido_original = ruta.read_text(encoding='utf-8')
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=[],
system_prompt=config["system_prompt"],
max_turns=5,
)
for intento in range(max_intentos):
resultado = ""
async for m in query(
prompt=f"Migra este código:\n\n```\n{contenido_original}\n```",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
# Extraer código del resultado
codigo_migrado = resultado
if "```" in resultado:
for delimitador in ["```python\n", "```javascript\n", "```\n"]:
if delimitador in resultado:
inicio = resultado.find(delimitador) + len(delimitador)
fin = resultado.find("```", inicio)
if fin > inicio:
codigo_migrado = resultado[inicio:fin].strip()
break
if codigo_migrado and len(codigo_migrado) > 10:
# Crear backup del original
backup = ruta.with_suffix(ruta.suffix + '.bak')
backup.write_text(contenido_original, encoding='utf-8')
# Escribir el código migrado
ruta.write_text(codigo_migrado, encoding='utf-8')
return {
"exito": True,
"archivo": str(ruta),
"backup": str(backup),
"lineas_antes": len(contenido_original.split('\n')),
"lineas_despues": len(codigo_migrado.split('\n')),
"estrategia": estrategia,
}
return {"exito": False, "error": "No se pudo generar código migrado"}
async def migrar_proyecto(
directorio: str,
estrategia: str,
patron: str = "**/*.py"
) -> dict:
"""Migra todos los archivos de un proyecto."""
archivos = [
str(p) for p in Path(directorio).glob(patron)
if not any(excl in str(p) for excl in ["__pycache__", ".venv", "test_"])
]
print(f"[Migración] {ESTRATEGIAS_MIGRACION[estrategia]['descripcion']}")
print(f"[Migración] Migrando {len(archivos)} archivos...")
semaforo = asyncio.Semaphore(3)
async def migrar_con_limite(archivo):
async with semaforo:
return await migrar_archivo(archivo, estrategia)
tareas = [migrar_con_limite(a) for a in archivos]
resultados = await asyncio.gather(*tareas, return_exceptions=True)
exitosos = [r for r in resultados if isinstance(r, dict) and r.get('exito')]
fallidos = [r for r in resultados if isinstance(r, dict) and not r.get('exito')]
return {
"total": len(archivos),
"exitosos": len(exitosos),
"fallidos": len(fallidos),
"archivos_migrados": [r['archivo'] for r in exitosos],
}
# Uso
asyncio.run(migrar_proyecto("/mi/proyecto/api", "flask_to_fastapi"))
4. Agente de Documentación
Genera docstrings, README, y diagramas para un proyecto.
import asyncio
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions
async def generar_docstrings(archivo: str) -> dict:
"""Agrega docstrings a todas las funciones y clases sin documentar."""
ruta = Path(archivo)
contenido = ruta.read_text(encoding='utf-8')
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=["Edit", "Read"],
system_prompt="""Eres un experto en documentación de Python.
Agrega docstrings formato Google a TODAS las funciones y clases que no tengan documentación.
Formato:
def funcion(param: tipo) -> tipo: """Descripción breve en una línea.
Args:
param: Descripción del parámetro.
Returns:
Descripción del valor de retorno.
Raises:
ValueError: Cuando el parámetro es inválido.
\"\"\"
NO modifiques el código, solo agrega la documentación faltante.""",
max_turns=10,
cwd=str(ruta.parent),
)
resultado = ""
async for m in query(
prompt=f"Agrega docstrings a las funciones sin documentar en: {archivo}",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
return {"archivo": archivo, "resultado": resultado}
async def generar_readme(directorio: str) -> str:
"""Genera un README.md completo para el proyecto."""
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=["Read", "Glob", "Grep"],
disallowed_tools=["Bash", "Write", "Edit"],
system_prompt="""Eres un experto en documentación de proyectos open source.
Analiza el código del proyecto y genera un README.md completo con:
## Secciones OBLIGATORIAS:
1. Nombre y descripción breve
2. Badges (build, coverage, versión, licencia)
3. Características principales
4. Instalación (con código)
5. Uso básico (con ejemplos de código)
6. API Reference (funciones/endpoints principales)
7. Configuración (variables de entorno)
8. Testing (cómo ejecutar tests)
9. Contribución
10. Licencia
Usa Markdown bien formateado con código resaltado.""",
max_turns=15,
cwd=directorio,
)
resultado = ""
async for m in query(
prompt="Genera un README.md completo para este proyecto analizando el código fuente.",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
# Guardar el README
readme_path = Path(directorio) / "README.md"
if resultado:
readme_path.write_text(resultado, encoding='utf-8')
print(f"[Documentación] README generado: {readme_path}")
return resultado
async def generar_diagrama_arquitectura(directorio: str) -> str:
"""Genera un diagrama Mermaid de la arquitectura del proyecto."""
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=["Read", "Glob", "Grep"],
disallowed_tools=["Bash", "Write", "Edit"],
system_prompt="""Analiza el código y genera un diagrama Mermaid de la arquitectura.
Identifica:
- Módulos principales y sus responsabilidades
- Dependencias entre módulos
- Capas de la aplicación (presentación, negocio, datos)
- Servicios externos (BD, APIs, etc.)
Retorna SOLO el bloque Mermaid:
```mermaid
graph TD
A[Módulo A] --> B[Módulo B]
...
```""",
max_turns=8,
cwd=directorio,
)
resultado = ""
async for m in query(
prompt="Genera un diagrama Mermaid de la arquitectura de este proyecto.",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
return resultado
async def documentar_proyecto_completo(directorio: str):
"""Documentación completa de un proyecto."""
archivos_py = [
str(p) for p in Path(directorio).glob("**/*.py")
if not any(e in str(p) for e in ["__pycache__", "test_", ".venv"])
]
print(f"[Documentación] Documentando {len(archivos_py)} archivos...")
# Docstrings en paralelo con límite de concurrencia
semaforo = asyncio.Semaphore(3)
async def doc_con_limite(f):
async with semaforo:
return await generar_docstrings(f)
await asyncio.gather(*[doc_con_limite(f) for f in archivos_py])
# README
print("[Documentación] Generando README...")
await generar_readme(directorio)
# Diagrama
print("[Documentación] Generando diagrama de arquitectura...")
diagrama = await generar_diagrama_arquitectura(directorio)
if diagrama:
(Path(directorio) / "ARCHITECTURE.md").write_text(
f"# Arquitectura\n\n{diagrama}", encoding='utf-8'
)
print("[Documentación] ¡Completado!")
5. Agente de Soporte Técnico
Analiza logs, diagnostica problemas y sugiere soluciones.
import asyncio
from claude_code_sdk import query, ClaudeCodeOptions
from pathlib import Path
import json
async def agente_soporte_tecnico(
descripcion_problema: str,
directorio_logs: str = None,
archivos_config: list = None
) -> dict:
"""
Agente de soporte que analiza logs y diagnostica problemas.
"""
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=["Read", "Glob", "Grep", "Bash"],
disallowed_tools=["Write", "Edit"],
system_prompt="""Eres un ingeniero de soporte técnico senior experto en diagnóstico.
Tu metodología:
1. Analizar los logs relevantes
2. Identificar el error root cause (NO los síntomas)
3. Verificar configuraciones relacionadas
4. Proponer soluciones ordenadas por probabilidad de éxito
IMPORTANTE:
- Sé específico: archivo, línea, configuración exacta
- Distingue entre causa raíz y síntomas
- Si hay múltiples causas posibles, ordénalas por probabilidad
- Propón pasos de diagnóstico adicionales si la información es insuficiente
Formato de respuesta:
## Diagnóstico
[causa raíz identificada]
## Evidencia
[logs/configs que sustentan el diagnóstico]
## Soluciones (ordenadas por probabilidad)
1. [solución más probable] - Pasos: ...
2. [solución alternativa] - Pasos: ...
## Prevención
[cómo evitar este problema en el futuro]""",
max_turns=15,
cwd=directorio_logs or ".",
)
contexto = [f"Problema reportado:\n{descripcion_problema}"]
if directorio_logs:
contexto.append(f"\nDirectorio de logs: {directorio_logs}")
if archivos_config:
contexto.append(f"\nArchivos de configuración a revisar: {', '.join(archivos_config)}")
prompt = "\n".join(contexto) + "\n\nDiagnostica el problema y propón soluciones."
resultado = ""
async for m in query(prompt=prompt, options=opciones):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
return {
"diagnostico": resultado,
"problema": descripcion_problema,
}
# Uso
async def main():
resultado = await agente_soporte_tecnico(
descripcion_problema="""
El servidor de producción está respondiendo con errores 502 Bad Gateway.
Empezó hace 20 minutos. Los logs de nginx muestran 'upstream timed out'.
La carga del servidor no es inusualmente alta.
""",
directorio_logs="/var/log/myapp",
archivos_config=["/etc/nginx/sites-enabled/myapp.conf", "/etc/myapp/config.yaml"]
)
print(resultado["diagnostico"])
asyncio.run(main())
6. Agente de DevOps
Genera Dockerfiles, CI/CD configs y monitorea infraestructura.
import asyncio
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions
async def generar_dockerfile(directorio: str) -> str:
"""Genera un Dockerfile optimizado para el proyecto."""
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=["Read", "Glob"],
disallowed_tools=["Bash", "Write", "Edit"],
system_prompt="""Eres un experto en Docker y contenedores.
Analiza el proyecto y genera un Dockerfile optimizado:
REQUISITOS:
- Multi-stage build para reducir tamaño final
- Usuario no-root en producción
- Layer caching optimizado (dependencias antes que código)
- Health check incluido
- Variables de entorno documentadas
- .dockerignore apropiado
Identifica automáticamente:
- Lenguaje y versión requerida
- Dependencias del sistema operativo
- Puerto de la aplicación
- Comando de inicio""",
max_turns=10,
cwd=directorio,
)
resultado = ""
async for m in query(
prompt="Analiza el proyecto y genera un Dockerfile optimizado para producción.",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
# Extraer Dockerfile del resultado
if "```dockerfile" in resultado.lower():
for delim in ["```dockerfile\n", "```Dockerfile\n"]:
if delim in resultado:
inicio = resultado.find(delim) + len(delim)
fin = resultado.find("```", inicio)
dockerfile = resultado[inicio:fin].strip()
Path(directorio).joinpath("Dockerfile").write_text(dockerfile)
return dockerfile
return resultado
async def generar_github_actions(directorio: str) -> dict:
"""Genera workflows de GitHub Actions para CI/CD."""
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=["Read", "Glob"],
disallowed_tools=["Bash", "Write", "Edit"],
system_prompt="""Eres un experto en GitHub Actions y CI/CD.
Analiza el proyecto y genera workflows completos:
1. CI workflow (en .github/workflows/ci.yml):
- Linting y format check
- Tests unitarios y de integración
- Coverage report
- Build verification
2. CD workflow (en .github/workflows/cd.yml):
- Build de imagen Docker
- Push a registry
- Deploy a staging en PR
- Deploy a producción en main
Detecta automáticamente el lenguaje y herramientas usadas.""",
max_turns=10,
cwd=directorio,
)
resultado = ""
async for m in query(
prompt="Genera workflows completos de GitHub Actions para CI/CD de este proyecto.",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
# Crear directorio de workflows
workflows_dir = Path(directorio) / ".github" / "workflows"
workflows_dir.mkdir(parents=True, exist_ok=True)
return {"resultado": resultado, "workflows_dir": str(workflows_dir)}
async def generar_docker_compose(directorio: str) -> str:
"""Genera docker-compose.yml para desarrollo local."""
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=["Read", "Glob"],
disallowed_tools=["Bash", "Write", "Edit"],
system_prompt="""Eres un experto en Docker Compose.
Analiza el proyecto y genera docker-compose.yml para desarrollo local con:
- Servicio principal de la aplicación
- Servicios de dependencias (PostgreSQL, Redis, etc.) si son detectados
- Volúmenes para hot reload en desarrollo
- Variables de entorno con valores por defecto seguros
- Healthchecks para servicios críticos
- Redes apropiadas entre servicios
También genera docker-compose.override.yml para overrides de desarrollo.""",
max_turns=10,
cwd=directorio,
)
resultado = ""
async for m in query(
prompt="Genera docker-compose.yml completo para este proyecto.",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
return resultado
async def setup_devops_completo(directorio: str):
"""Configura todo el stack DevOps de un proyecto."""
print("[DevOps] Configurando infraestructura...")
print("[DevOps] Generando Dockerfile...")
dockerfile = await generar_dockerfile(directorio)
print("[DevOps] Generando docker-compose...")
docker_compose = await generar_docker_compose(directorio)
print("[DevOps] Generando GitHub Actions...")
github_actions = await generar_github_actions(directorio)
print("[DevOps] Configuración completa!")
return {
"dockerfile": "Generado" if dockerfile else "Error",
"docker_compose": "Generado" if docker_compose else "Error",
"github_actions": "Generado" if github_actions else "Error",
}
7. Agente de Investigación
Busca en múltiples fuentes, sintetiza información y genera reportes.
import { query, ClaudeCodeOptions } from "@anthropic-ai/claude-code-sdk";
interface FuenteInvestigacion {
tipo: "archivo" | "directorio" | "url";
ruta: string;
}
interface ReporteInvestigacion {
tema: string;
hallazgos: string[];
conclusiones: string;
reporte: string;
}
async function investigarTema(
tema: string,
fuentes: FuenteInvestigacion[],
directorio: string
): Promise<ReporteInvestigacion> {
const opciones: ClaudeCodeOptions = {
model: "claude-opus-4-5",
allowedTools: ["Read", "Glob", "Grep"],
disallowedTools: ["Bash", "Write", "Edit"],
systemPrompt: `Eres un investigador experto.
Investiga exhaustivamente el tema: "${tema}"
Busca en todos los archivos disponibles: código, documentación, comentarios, configs.
Sintetiza la información encontrada en hallazgos concretos y accionables.
Cita siempre las fuentes (archivo:línea).`,
maxTurns: 20,
cwd: directorio,
};
const fuentesTexto = fuentes
.map((f) => `- ${f.tipo}: ${f.ruta}`)
.join("\n");
let reporte = "";
for await (const m of query({
prompt: `Investiga "${tema}" en estas fuentes:\n${fuentesTexto}\n\nGenera un reporte detallado.`,
options: opciones,
})) {
if (m.type === "assistant" && m.message.content) {
for (const b of m.message.content) {
if (b.type === "text") reporte = b.text;
}
}
}
return {
tema,
hallazgos: [],
conclusiones: "",
reporte,
};
}
// Uso en TypeScript
async function investigarMultiplesTemas(
temas: string[],
directorio: string
): Promise<ReporteInvestigacion[]> {
const fuentes: FuenteInvestigacion[] = [
{ tipo: "directorio", ruta: directorio },
];
// Investigar todos los temas en paralelo (con límite)
const CONCURRENCIA = 3;
const resultados: ReporteInvestigacion[] = [];
for (let i = 0; i < temas.length; i += CONCURRENCIA) {
const grupo = temas.slice(i, i + CONCURRENCIA);
const parciales = await Promise.all(
grupo.map((tema) => investigarTema(tema, fuentes, directorio))
);
resultados.push(...parciales);
}
return resultados;
}
8. Agente de Seguridad
Escanea vulnerabilidades OWASP y genera fixes.
import asyncio
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions
OWASP_TOP_10 = {
"A01": "Broken Access Control",
"A02": "Cryptographic Failures",
"A03": "Injection",
"A04": "Insecure Design",
"A05": "Security Misconfiguration",
"A06": "Vulnerable Components",
"A07": "Identification and Authentication Failures",
"A08": "Software and Data Integrity Failures",
"A09": "Security Logging Failures",
"A10": "Server-Side Request Forgery",
}
async def escanear_vulnerabilidades(directorio: str) -> dict:
"""Escanea el proyecto buscando vulnerabilidades OWASP Top 10."""
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=["Read", "Glob", "Grep"],
disallowed_tools=["Bash", "Write", "Edit"],
system_prompt=f"""Eres un experto en seguridad de aplicaciones especializado en OWASP Top 10.
Categorías a buscar:
{chr(10).join(f"- {k}: {v}" for k, v in OWASP_TOP_10.items())}
Para cada vulnerabilidad encontrada, retorna JSON:
{{
"vulnerabilidades": [
{{
"categoria": "A01",
"descripcion": "...",
"archivo": "ruta/archivo.py",
"linea": N,
"codigo_vulnerable": "fragmento del código",
"severidad": "critical|high|medium|low",
"fix_sugerido": "descripción de cómo arreglarlo"
}}
]
}}
Si no hay vulnerabilidades: {{"vulnerabilidades": []}}""",
max_turns=20,
cwd=directorio,
)
resultado = ""
async for m in query(
prompt=f"Escanea todo el código de este proyecto en busca de vulnerabilidades OWASP Top 10.",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
import json
try:
inicio = resultado.find('{"vulnerabilidades"')
if inicio >= 0:
fin = resultado.rfind('}') + 1
datos = json.loads(resultado[inicio:fin])
return datos
except (json.JSONDecodeError, ValueError):
pass
return {"vulnerabilidades": [], "raw": resultado}
async def generar_fix_seguridad(vulnerabilidad: dict, directorio: str) -> dict:
"""Genera y aplica un fix para una vulnerabilidad específica."""
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=["Read", "Edit"],
disallowed_tools=["Bash", "Write", "Glob", "Grep"],
system_prompt=f"""Eres un experto en seguridad que arregla vulnerabilidades {vulnerabilidad['categoria']}.
Aplica el fix de seguridad mínimamente invasivo:
- Arregla SOLO la vulnerabilidad identificada
- NO cambies la funcionalidad
- Agrega comentario explicando el fix
- Sigue las mejores prácticas de seguridad""",
max_turns=5,
cwd=directorio,
)
resultado = ""
async for m in query(
prompt=f"""Arregla esta vulnerabilidad:
Archivo: {vulnerabilidad['archivo']}
Línea: {vulnerabilidad['linea']}
Problema: {vulnerabilidad['descripcion']}
Fix sugerido: {vulnerabilidad['fix_sugerido']}
Aplica el fix directamente en el archivo.""",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
return {
"vulnerabilidad": vulnerabilidad,
"fix_aplicado": resultado,
"exito": len(resultado) > 0
}
async def auditoria_seguridad_completa(directorio: str) -> dict:
"""Auditoría de seguridad completa: escaneo y remediación automática."""
print("[Seguridad] Iniciando auditoría OWASP Top 10...")
# Escaneo
escaneo = await escanear_vulnerabilidades(directorio)
vulnerabilidades = escaneo.get("vulnerabilidades", [])
print(f"[Seguridad] Encontradas {len(vulnerabilidades)} vulnerabilidades")
if not vulnerabilidades:
return {"estado": "LIMPIO", "vulnerabilidades": 0}
# Clasificar por severidad
criticas = [v for v in vulnerabilidades if v.get("severidad") == "critical"]
altas = [v for v in vulnerabilidades if v.get("severidad") == "high"]
# Aplicar fixes automáticos para críticos y altos
fixes_aplicados = []
para_fix = criticas + altas
semaforo = asyncio.Semaphore(2) # Máximo 2 fixes en paralelo
async def fix_con_limite(vuln):
async with semaforo:
return await generar_fix_seguridad(vuln, directorio)
if para_fix:
print(f"[Seguridad] Aplicando {len(para_fix)} fixes automáticos...")
resultados_fix = await asyncio.gather(*[fix_con_limite(v) for v in para_fix])
fixes_aplicados = [r for r in resultados_fix if r.get("exito")]
return {
"estado": "VULNERABILIDADES_ENCONTRADAS",
"total_vulnerabilidades": len(vulnerabilidades),
"criticas": len(criticas),
"altas": len(altas),
"medias": len([v for v in vulnerabilidades if v.get("severidad") == "medium"]),
"bajas": len([v for v in vulnerabilidades if v.get("severidad") == "low"]),
"fixes_aplicados": len(fixes_aplicados),
"vulnerabilidades": vulnerabilidades,
}
asyncio.run(auditoria_seguridad_completa("/mi/proyecto"))
9. Agente de Performance
Profila código, identifica bottlenecks y sugiere optimizaciones.
import asyncio
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions
async def analizar_performance(directorio: str) -> dict:
"""Analiza el código buscando problemas de performance."""
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=["Read", "Glob", "Grep"],
disallowed_tools=["Bash", "Write", "Edit"],
system_prompt="""Eres un experto en optimización de performance de Python.
Busca específicamente:
1. N+1 queries (loops que ejecutan queries de BD)
2. Operaciones I/O síncronas que deberían ser async
3. Cálculos redundantes dentro de loops
4. Objetos grandes serializados/deserializados innecesariamente
5. Falta de caching en operaciones costosas repetidas
6. Uso ineficiente de estructuras de datos (lista cuando debería ser set/dict)
7. String concatenation en loops (usar join o f-strings)
8. Imports costosos dentro de funciones
Para cada problema, retorna JSON:
{
"problemas": [
{
"tipo": "n+1_query|sync_io|redundant_calc|bad_data_structure|missing_cache",
"archivo": "...",
"linea": N,
"descripcion": "...",
"impacto": "alto|medio|bajo",
"solucion": "código exacto de la solución",
"mejora_estimada": "X% más rápido"
}
]
}""",
max_turns=15,
cwd=directorio,
)
resultado = ""
async for m in query(
prompt="Analiza el código buscando problemas de performance y bottlenecks.",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
import json
try:
inicio = resultado.find('{"problemas"')
if inicio >= 0:
fin = resultado.rfind('}') + 1
return json.loads(resultado[inicio:fin])
except Exception:
pass
return {"problemas": [], "raw": resultado}
async def optimizar_funcion(archivo: str, linea: int, descripcion: str) -> bool:
"""Aplica optimización a una función específica."""
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=["Read", "Edit"],
disallowed_tools=["Bash", "Write"],
system_prompt=f"""Aplica la optimización de performance descrita.
Problema en línea {linea}: {descripcion}
REGLAS:
- Mantén la funcionalidad exactamente igual
- Aplica SOLO la optimización descrita
- Agrega comentario: # OPTIMIZADO: descripción del cambio
- Verifica que la lógica es correctamente equivalente""",
max_turns=5,
cwd=str(Path(archivo).parent),
)
resultado = ""
async for m in query(
prompt=f"Aplica la optimización en: {archivo}",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
return len(resultado) > 0
async def optimizacion_automatica(directorio: str, solo_impacto_alto: bool = True) -> dict:
"""Analiza y aplica optimizaciones automáticamente."""
print("[Performance] Analizando bottlenecks...")
analisis = await analizar_performance(directorio)
problemas = analisis.get("problemas", [])
if solo_impacto_alto:
problemas = [p for p in problemas if p.get("impacto") == "alto"]
print(f"[Performance] {len(problemas)} problemas de impacto alto encontrados")
optimizados = 0
for problema in problemas:
print(f"[Performance] Optimizando: {problema.get('tipo')} en {problema.get('archivo')}")
exito = await optimizar_funcion(
problema["archivo"],
problema["linea"],
problema["descripcion"]
)
if exito:
optimizados += 1
return {
"problemas_encontrados": len(analisis.get("problemas", [])),
"problemas_alto_impacto": len(problemas),
"optimizaciones_aplicadas": optimizados,
}
10. Agente de API: Generador de Clientes SDK
Genera clientes SDK desde especificaciones OpenAPI.
import asyncio
import json
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions
async def generar_cliente_sdk(
spec_openapi: str,
lenguaje: str = "python",
directorio_output: str = "./sdk"
) -> dict:
"""
Genera un cliente SDK tipado desde una especificación OpenAPI.
"""
# Cargar la especificación
spec_path = Path(spec_openapi)
if spec_path.exists():
spec_contenido = spec_path.read_text(encoding='utf-8')
else:
spec_contenido = spec_openapi # Asumir que es el contenido directamente
templates = {
"python": """Genera un cliente SDK en Python con:
- Clase principal: APIClient con base_url y api_key
- Un método por endpoint con type hints completos
- Modelos Pydantic para request/response bodies
- Manejo de errores con excepciones custom (APIError, AuthError, RateLimitError)
- Retry automático para 429 y 5xx
- Async/await usando httpx
- Docstrings completos
Retorna un solo archivo Python completo.""",
"typescript": """Genera un cliente SDK en TypeScript con:
- Clase principal: APIClient con baseURL y apiKey
- Un método async por endpoint con tipos TypeScript
- Interfaces para todos los schemas
- Manejo de errores con tipos discriminados
- Retry automático para 429 y 5xx
- fetch nativo o axios
- JSDoc en todos los métodos
Retorna un solo archivo TypeScript completo.""",
}
system_prompt = templates.get(lenguaje, templates["python"])
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=[], # Solo generación de texto
system_prompt=system_prompt,
max_turns=5,
)
resultado = ""
async for m in query(
prompt=f"Genera el cliente SDK {lenguaje} para esta especificación OpenAPI:\n\n```json\n{spec_contenido[:8000]}\n```",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
# Extraer código del resultado
codigo = resultado
for delim in [f"```{lenguaje}\n", "```python\n", "```typescript\n", "```\n"]:
if delim in resultado:
inicio = resultado.find(delim) + len(delim)
fin = resultado.find("```", inicio)
if fin > inicio:
codigo = resultado[inicio:fin].strip()
break
# Guardar el SDK generado
output_dir = Path(directorio_output)
output_dir.mkdir(parents=True, exist_ok=True)
extension = "py" if lenguaje == "python" else "ts"
archivo_sdk = output_dir / f"client.{extension}"
archivo_sdk.write_text(codigo, encoding='utf-8')
return {
"exito": True,
"archivo": str(archivo_sdk),
"lenguaje": lenguaje,
"lineas": len(codigo.split('\n')),
}
# Uso
asyncio.run(generar_cliente_sdk(
"openapi.json",
lenguaje="python",
directorio_output="./generated-sdk"
))
11. Pipeline de Generación de Features
De la idea al Pull Request: plan → código → tests → review → PR.
import asyncio
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions
async def pipeline_feature_completo(
descripcion_feature: str,
repositorio: str
) -> dict:
"""
Pipeline completo: descripción → plan → código → tests → PR.
"""
print(f"[Feature Pipeline] Iniciando: {descripcion_feature[:60]}")
# FASE 1: Planificación
print("[Feature] Fase 1: Planificación...")
opciones_plan = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=["Read", "Glob", "Grep"],
disallowed_tools=["Bash", "Write", "Edit"],
system_prompt="""Eres un arquitecto de software senior.
Analiza el repositorio y planifica la implementación del feature solicitado.
Retorna un plan técnico detallado con:
1. Archivos a crear/modificar
2. Cambios de schema si aplica
3. Nuevas dependencias si aplica
4. Plan de testing
5. Riesgos y consideraciones""",
max_turns=10,
cwd=repositorio,
)
plan = ""
async for m in query(
prompt=f"Planifica cómo implementar este feature: {descripcion_feature}",
options=opciones_plan
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
plan = b.text
print(f"[Feature] Plan generado ({len(plan)} chars)")
# FASE 2: Implementación
print("[Feature] Fase 2: Implementación...")
opciones_impl = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=["Read", "Write", "Edit", "Bash", "Glob", "Grep"],
system_prompt=f"""Eres un developer senior implementando un feature.
Plan de implementación:
{plan[:2000]}
REGLAS:
- Implementa EXACTAMENTE lo planificado, no más
- Sigue las convenciones de código existentes en el proyecto
- Escribe código limpio, tipado y documentado
- Al finalizar, confirma cada archivo creado/modificado""",
max_turns=25,
cwd=repositorio,
)
implementacion = ""
async for m in query(
prompt=f"Implementa el feature: {descripcion_feature}\n\nSigue el plan generado.",
options=opciones_impl
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
implementacion = b.text
# FASE 3: Tests
print("[Feature] Fase 3: Generación de tests...")
opciones_tests = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=["Read", "Write", "Bash", "Glob"],
system_prompt="""Eres un QA engineer generando tests para un nuevo feature.
Genera tests que cubran:
- Happy path completo
- Edge cases críticos
- Tests de integración si hay interacciones con BD/APIs
Ejecuta los tests al finalizar para verificar que pasan.""",
max_turns=15,
cwd=repositorio,
)
tests = ""
async for m in query(
prompt=f"Genera y ejecuta tests para el feature recién implementado: {descripcion_feature}",
options=opciones_tests
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
tests = b.text
# FASE 4: Preparar PR
print("[Feature] Fase 4: Preparando PR...")
opciones_pr = ClaudeCodeOptions(
model="claude-haiku-4-5",
allowed_tools=["Bash"], # Solo git commands
system_prompt="""Prepara los cambios para un Pull Request:
1. git add de todos los archivos nuevos/modificados
2. git commit con mensaje descriptivo en formato convencional (feat: ...)
3. Genera el body del PR en Markdown con: Summary, Changes, Testing, Screenshots si aplica""",
max_turns=5,
cwd=repositorio,
)
pr_info = ""
async for m in query(
prompt=f"Prepara el commit y el body del PR para: {descripcion_feature}",
options=opciones_pr
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
pr_info = b.text
print("[Feature] Pipeline completado!")
return {
"feature": descripcion_feature,
"plan": plan,
"implementacion": implementacion,
"tests": tests,
"pr_info": pr_info,
}
# Uso
asyncio.run(pipeline_feature_completo(
"Agregar autenticación con Google OAuth2 al endpoint /auth/google",
"/mi/proyecto"
))
12. Agente de Legacy Code
Entiende código legacy sin documentación, agrega tests y documentación.
import asyncio
from pathlib import Path
from claude_code_sdk import query, ClaudeCodeOptions
async def analizar_codigo_legacy(directorio: str) -> dict:
"""
Analiza código legacy: entiende qué hace, detecta patrones problemáticos
y genera un mapa de comprensión del sistema.
"""
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=["Read", "Glob", "Grep"],
disallowed_tools=["Bash", "Write", "Edit"],
system_prompt="""Eres un arqueólogo de código legacy.
Tu especialidad es entender sistemas antiguos sin documentación.
Para este análisis:
1. Identifica el propósito del sistema
2. Mapea los componentes principales
3. Detecta el flujo de datos principal
4. Identifica qué es crítico (no tocar sin tests) vs qué es seguro de refactorizar
5. Lista las dependencias externas y sus versiones si puedes detectarlas
6. Detecta deuda técnica crítica
7. Identifica "code smells" legados (god objects, spaghetti, magic numbers)
Genera un informe de comprensión del sistema en Markdown.""",
max_turns=20,
cwd=directorio,
)
resultado = ""
async for m in query(
prompt="Analiza y documenta este sistema legacy. Necesito entender qué hace y cómo.",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
return {"analisis": resultado}
async def agregar_tests_caracterizacion(
archivo: str,
analisis_previo: str
) -> dict:
"""
Genera tests de caracterización para código legacy.
Los tests de caracterización documentan el comportamiento ACTUAL,
permitiendo refactorizar con confianza.
"""
ruta = Path(archivo)
contenido = ruta.read_text(encoding='utf-8')
opciones = ClaudeCodeOptions(
model="claude-opus-4-5",
allowed_tools=["Write"],
disallowed_tools=["Bash", "Edit", "Read"],
system_prompt=f"""Eres un experto en refactoring seguro de código legacy.
Generas tests de CARACTERIZACIÓN — tests que documentan el comportamiento ACTUAL,
no el comportamiento ideal.
Contexto del sistema:
{analisis_previo[:1000]}
PRINCIPIOS de tests de caracterización:
- Prueban EXACTAMENTE el comportamiento actual (incluyendo bugs conocidos)
- NO prueban el comportamiento ideal o deseado
- Son la red de seguridad para refactorizar después
- Usan valores reales del dominio, no valores arbitrarios
- Tienen nombres que describen el comportamiento observado: test_cuando_X_actualmente_hace_Y
Genera el archivo de tests de caracterización.""",
max_turns=8,
cwd=str(ruta.parent),
)
resultado = ""
async for m in query(
prompt=f"Genera tests de caracterización para:\n\n```python\n{contenido[:4000]}\n```",
options=opciones
):
if hasattr(m, 'content'):
for b in m.content:
if hasattr(b, 'text'):
resultado = b.text
# Guardar tests de caracterización
archivo_tests = ruta.parent / f"test_{ruta.stem}_characterization.py"
codigo_tests = resultado
if "```python" in resultado:
inicio = resultado.find("```python") + 9
fin = resultado.find("```", inicio)
codigo_tests = resultado[inicio:fin].strip()
if codigo_tests:
archivo_tests.write_text(codigo_tests, encoding='utf-8')
return {
"archivo_original": archivo,
"archivo_tests": str(archivo_tests),
"tests_generados": len([l for l in codigo_tests.split('\n') if l.strip().startswith('def test_')])
}
async def modernizar_legacy(directorio: str) -> dict:
"""
Pipeline completo de modernización de código legacy:
1. Analizar y entender
2. Agregar tests de caracterización
3. Documentar
4. Identificar refactorizaciones seguras
"""
print("[Legacy] Analizando sistema legacy...")
analisis = await analizar_codigo_legacy(directorio)
# Guardar análisis
analisis_path = Path(directorio) / "LEGACY_ANALYSIS.md"
analisis_path.write_text(analisis["analisis"], encoding='utf-8')
print(f"[Legacy] Análisis guardado en: {analisis_path}")
# Agregar tests de caracterización a archivos principales
archivos_py = sorted(
[p for p in Path(directorio).glob("**/*.py")
if not any(e in str(p) for e in ["test_", "__pycache__", ".venv"])],
key=lambda p: p.stat().st_size,
reverse=True # Empezar por los archivos más grandes (más críticos)
)[:5] # Procesar los 5 más grandes
tests_generados = []
for archivo in archivos_py:
print(f"[Legacy] Generando tests de caracterización para: {archivo.name}")
resultado = await agregar_tests_caracterizacion(str(archivo), analisis["analisis"])
tests_generados.append(resultado)
return {
"analisis": analisis["analisis"][:500],
"analisis_completo": str(analisis_path),
"tests_de_caracterizacion": tests_generados,
"total_tests": sum(r.get("tests_generados", 0) for r in tests_generados),
}
asyncio.run(modernizar_legacy("/mi/proyecto/legacy"))
Resumen del capítulo
Este capítulo presentó 12 implementaciones completas de agentes reales:
mindmap
root((Casos de Uso))
Calidad de Código
Code Review Automático
Generador de Tests
Análisis de Performance
Transformación
Migración de Frameworks
Modernización de Legacy
Documentación
Docstrings automáticos
README generation
Diagramas de arquitectura
DevOps
Dockerfile generation
GitHub Actions
Docker Compose
Seguridad
Escaneo OWASP Top 10
Fixes automáticos
Productividad
Feature Pipeline completo
Generador de SDK
Agente de soporte técnico
Próximo capítulo: Mejores prácticas y patrones de producción — cómo llevar estos agentes a producción de forma robusta y segura.