Capítulo 22: Monitoreo y Observabilidad
Capítulo 22: Monitoreo y Observabilidad
“No puedes mejorar lo que no puedes medir”
Los Tres Pilares de la Observabilidad
Un sistema observable permite entender su comportamiento interno a través de:
- Métricas: Valores numéricos agregados (contadores, histogramas, gauges)
- Logs: Registros textuales de eventos del sistema
- Traces: Seguimiento de operaciones a través de múltiples servicios
Para Event Sourcing, necesitamos métricas específicas que reflejen la salud del Event Store, proyecciones, y agregados.
Métricas Clave
Event Store
Estas métricas indican la salud del almacenamiento de eventos:
| Métrica | Descripción | Alerta |
|---|---|---|
events_appended_total | Total de eventos escritos | - |
events_append_duration_ms | Latencia de escritura | > 100ms |
stream_length | Eventos por stream | > 10,000 (considerar snapshots) |
global_position | Posición global actual | - |
concurrency_errors_total | Conflictos de concurrencia | > 10/min (posible contención) |
Proyecciones
Estas métricas indican si las proyecciones mantienen el ritmo con el Event Store:
| Métrica | Descripción | Alerta |
|---|---|---|
projection_lag | Eventos pendientes de procesar | > 1000 (proyección retrasada) |
projection_events_processed | Eventos procesados total | - |
projection_processing_duration_ms | Tiempo por evento | > 50ms (cuello de botella) |
projection_errors_total | Errores de proyección | > 0 (investigar inmediatamente) |
Implementación de Métricas
Usamos Prometheus como sistema de métricas (estándar de facto en Kubernetes). Las métricas se exponen en formato texto que Prometheus scrapea periódicamente.
Tipos de métricas:
- Counter: Solo incrementa (eventos totales)
- Gauge: Sube y baja (lag actual)
- Histogram: Distribución de valores (latencias)
// src/infrastructure/monitoring/metrics.ts
import { Registry, Counter, Histogram, Gauge } from 'prom-client';
export const registry = new Registry();
// Event Store metrics
export const eventsAppendedCounter = new Counter({
name: 'event_store_events_appended_total',
help: 'Total events appended to event store',
labelNames: ['stream_type', 'event_type'],
registers: [registry]
});
export const appendDurationHistogram = new Histogram({
name: 'event_store_append_duration_seconds',
help: 'Duration of append operations',
labelNames: ['stream_type'],
buckets: [0.01, 0.05, 0.1, 0.25, 0.5, 1],
registers: [registry]
});
export const concurrencyErrorsCounter = new Counter({
name: 'event_store_concurrency_errors_total',
help: 'Total concurrency conflicts',
labelNames: ['stream_type'],
registers: [registry]
});
// Projection metrics
export const projectionLagGauge = new Gauge({
name: 'projection_lag_events',
help: 'Number of events behind global position',
labelNames: ['projection_name'],
registers: [registry]
});
export const projectionProcessedCounter = new Counter({
name: 'projection_events_processed_total',
help: 'Total events processed by projection',
labelNames: ['projection_name', 'event_type'],
registers: [registry]
});
// Agregados
export const aggregateLoadDuration = new Histogram({
name: 'aggregate_load_duration_seconds',
help: 'Duration to load and rehydrate aggregate',
labelNames: ['aggregate_type', 'used_snapshot'],
buckets: [0.01, 0.05, 0.1, 0.5, 1, 5],
registers: [registry]
});
Event Store Instrumentado
// src/infrastructure/event-store/instrumented-event-store.ts
import {
eventsAppendedCounter,
appendDurationHistogram,
concurrencyErrorsCounter
} from '../monitoring/metrics';
export class InstrumentedEventStore implements EventStore {
constructor(private inner: EventStore) {}
async append(
streamId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<AppendResult> {
const streamType = this.extractStreamType(streamId);
const timer = appendDurationHistogram.startTimer({ stream_type: streamType });
try {
const result = await this.inner.append(streamId, events, expectedVersion);
// Registrar métricas de éxito
events.forEach(event => {
eventsAppendedCounter.inc({
stream_type: streamType,
event_type: event.eventType
});
});
return result;
} catch (error) {
if (error instanceof ConcurrencyError) {
concurrencyErrorsCounter.inc({ stream_type: streamType });
}
throw error;
} finally {
timer();
}
}
private extractStreamType(streamId: string): string {
return streamId.split('-')[0] ?? 'unknown';
}
// Delegar otros métodos...
async readStream(streamId: string): Promise<StoredEvent[]> {
return this.inner.readStream(streamId);
}
}
Logging Estructurado
El logging estructurado emite logs como objetos JSON en lugar de texto plano. Esto permite:
- Búsqueda por campos específicos (correlation_id, event_type)
- Agregaciones y estadísticas en herramientas como Elasticsearch
- Parseo automático sin expresiones regulares frágiles
// src/infrastructure/logging/logger.ts
import pino from 'pino';
export const logger = pino({
level: process.env.LOG_LEVEL ?? 'info',
formatters: {
level: (label) => ({ level: label })
},
base: {
service: 'orderflow',
version: process.env.APP_VERSION ?? '1.0.0'
}
});
// Logging de eventos
export function logEvent(event: DomainEvent, action: 'appended' | 'processed'): void {
logger.info({
event_id: event.eventId,
event_type: event.eventType,
aggregate_id: event.aggregateId,
aggregate_type: event.aggregateType,
version: event.version,
correlation_id: event.metadata.correlationId,
action
}, `Event ${action}: ${event.eventType}`);
}
// Logging de comandos
export function logCommand(
commandName: string,
aggregateId: string,
correlationId: string,
duration: number,
success: boolean
): void {
logger.info({
command: commandName,
aggregate_id: aggregateId,
correlation_id: correlationId,
duration_ms: duration,
success
}, `Command executed: ${commandName}`);
}
Distributed Tracing
Distributed Tracing permite seguir una operación a través de múltiples servicios. Cada operación crea un span con:
- Nombre de la operación
- Timestamps de inicio y fin
- Atributos (metadata)
- Relación padre-hijo con otros spans
OpenTelemetry es el estándar de facto para tracing.
// src/infrastructure/tracing/tracer.ts
import { trace, SpanKind, context } from '@opentelemetry/api';
const tracer = trace.getTracer('orderflow');
export function traceCommand<T>(
commandName: string,
aggregateId: string,
fn: () => Promise<T>
): Promise<T> {
return tracer.startActiveSpan(
`command.${commandName}`,
{
kind: SpanKind.INTERNAL,
attributes: {
'command.name': commandName,
'aggregate.id': aggregateId
}
},
async (span) => {
try {
const result = await fn();
span.setStatus({ code: 0 });
return result;
} catch (error) {
span.setStatus({ code: 2, message: (error as Error).message });
throw error;
} finally {
span.end();
}
}
);
}
export function traceProjection<T>(
projectionName: string,
eventType: string,
fn: () => Promise<T>
): Promise<T> {
return tracer.startActiveSpan(
`projection.${projectionName}.${eventType}`,
{
kind: SpanKind.CONSUMER,
attributes: {
'projection.name': projectionName,
'event.type': eventType
}
},
async (span) => {
try {
const result = await fn();
span.setStatus({ code: 0 });
return result;
} catch (error) {
span.setStatus({ code: 2, message: (error as Error).message });
throw error;
} finally {
span.end();
}
}
);
}
Health Checks
Los health checks permiten que orquestadores (Kubernetes) y load balancers determinen si un servicio puede recibir tráfico:
- /health/live: ¿El proceso está vivo? (liveness)
- /health/ready: ¿Puede procesar requests? (readiness)
// src/api/health.ts
import { Hono } from 'hono';
interface HealthStatus {
status: 'healthy' | 'degraded' | 'unhealthy';
checks: Record<string, CheckResult>;
}
interface CheckResult {
status: 'pass' | 'fail';
message?: string;
duration_ms?: number;
}
export function createHealthRoutes(deps: {
eventStore: EventStore;
db: Database;
}) {
const app = new Hono();
app.get('/health', async (c) => {
const checks: Record<string, CheckResult> = {};
// Check Event Store
const esStart = Date.now();
try {
await deps.eventStore.readAll(0n, 1);
checks.event_store = {
status: 'pass',
duration_ms: Date.now() - esStart
};
} catch (error) {
checks.event_store = {
status: 'fail',
message: (error as Error).message
};
}
// Check Database
const dbStart = Date.now();
try {
await deps.db.execute(sql`SELECT 1`);
checks.database = {
status: 'pass',
duration_ms: Date.now() - dbStart
};
} catch (error) {
checks.database = {
status: 'fail',
message: (error as Error).message
};
}
const allPassing = Object.values(checks).every(c => c.status === 'pass');
const status: HealthStatus = {
status: allPassing ? 'healthy' : 'unhealthy',
checks
};
return c.json(status, allPassing ? 200 : 503);
});
app.get('/health/live', (c) => c.json({ status: 'ok' }));
app.get('/health/ready', async (c) => {
// Verificar que las proyecciones estén al día
const lag = await getProjectionLag();
if (lag > 1000) {
return c.json({ status: 'not ready', lag }, 503);
}
return c.json({ status: 'ready', lag });
});
return app;
}
Dashboard de Métricas
// Queries útiles para Grafana/Prometheus
// Tasa de eventos por minuto
// rate(event_store_events_appended_total[1m])
// Latencia p99 de append
// histogram_quantile(0.99, rate(event_store_append_duration_seconds_bucket[5m]))
// Lag de proyecciones
// projection_lag_events
// Errores de concurrencia por minuto
// rate(event_store_concurrency_errors_total[1m])
Alertas
# alerts.yml
groups:
- name: event-sourcing
rules:
- alert: HighProjectionLag
expr: projection_lag_events > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "Projection lag is high"
- alert: HighConcurrencyErrors
expr: rate(event_store_concurrency_errors_total[5m]) > 0.1
for: 2m
labels:
severity: warning
annotations:
summary: "High rate of concurrency conflicts"
- alert: SlowEventAppend
expr: histogram_quantile(0.99, rate(event_store_append_duration_seconds_bucket[5m])) > 0.5
for: 5m
labels:
severity: critical
annotations:
summary: "Event append latency is too high"
Resumen
- Las métricas permiten entender el comportamiento del sistema
- El logging estructurado facilita debugging
- El tracing distribuido conecta operaciones relacionadas
- Los health checks indican disponibilidad del servicio
- Las alertas notifican problemas proactivamente
Glosario
Observabilidad
Definición: Capacidad de entender el estado interno de un sistema a partir de sus outputs externos (métricas, logs, traces).
Por qué es importante: Permite diagnosticar problemas en producción sin acceso directo al sistema ni capacidad de reproducir el bug.
Ejemplo práctico: Un spike en projection_lag combinado con logs de “connection timeout” y trace mostrando latencia en DB permite diagnosticar problema de red.
Prometheus
Definición: Sistema de monitoreo y alertas open-source que recolecta métricas via HTTP pull (scraping) y las almacena en serie temporal.
Por qué es importante: Estándar de facto para métricas en ecosistemas cloud-native y Kubernetes.
Ejemplo práctico: Prometheus scrapea /metrics cada 15s, almacena events_appended_total, y Grafana lo visualiza como gráfico de tasa de eventos/minuto.
Counter (Métrica)
Definición: Métrica que solo puede incrementar (o resetearse a cero). Representa valores acumulativos.
Por qué es importante: Ideal para contar eventos, errores, requests. La tasa de cambio (rate()) muestra eventos/segundo.
Ejemplo práctico: events_appended_total{event_type="OrderCreated"} incrementa con cada orden; rate(...[5m]) muestra órdenes/segundo.
Histogram (Métrica)
Definición: Métrica que agrupa observaciones en buckets predefinidos, permitiendo calcular percentiles.
Por qué es importante: Captura la distribución de latencias, no solo el promedio. El p99 revela casos extremos que el promedio oculta.
Ejemplo práctico: histogram_quantile(0.99, rate(append_duration_bucket[5m])) devuelve la latencia que el 99% de appends no excede.
Gauge (Métrica)
Definición: Métrica que puede subir y bajar, representando un valor instantáneo.
Por qué es importante: Ideal para valores que fluctúan como conexiones activas, memoria usada, o projection lag.
Ejemplo práctico: projection_lag_events{projection="orders"} muestra cuántos eventos faltan procesar ahora mismo.
Structured Logging
Definición: Logs emitidos como objetos estructurados (JSON) en lugar de texto plano.
Por qué es importante: Permite búsqueda, filtrado y agregación por campos específicos en sistemas como Elasticsearch o Loki.
Ejemplo práctico: {"level":"info","event_id":"abc","event_type":"OrderCreated","correlation_id":"xyz"} permite buscar todos los logs de una transacción por correlation_id.
Distributed Tracing
Definición: Técnica para seguir una operación a través de múltiples servicios, conectando logs y métricas de cada componente.
Por qué es importante: En microservicios, una request atraviesa múltiples servicios; el trace muestra dónde está la latencia.
Ejemplo práctico: Trace de “crear orden” muestra: API (5ms) -> Agregado (2ms) -> EventStore (50ms) -> Projection (10ms), identificando que EventStore es el cuello de botella.
Span
Definición: Unidad de trabajo en distributed tracing, con nombre, timestamps, y metadata. Los spans se anidan formando un árbol.
Por qué es importante: Cada span representa una operación con su duración; la estructura jerárquica muestra qué operaciones llaman a otras.
Ejemplo práctico: Span “command.ConfirmOrder” contiene child spans “load_aggregate”, “validate”, “append_events”, cada uno con su duración.
Liveness vs Readiness
Definición: Liveness indica si el proceso está vivo (debe reiniciarse si falla); Readiness indica si puede procesar tráfico (se excluye del load balancer si falla).
Por qué es importante: Un servicio puede estar vivo pero no listo (conectando a DB, cargando cache); el tráfico no debe enviarse hasta que esté ready.
Ejemplo práctico: Servicio arranca (live), carga proyecciones durante 30s (not ready), completa carga (ready), empieza a recibir tráfico.