← Volver al listado de tecnologías

Monitoreo y Métricas

Por: SiempreListo
cqrsmonitoreoprometheusgrafanaobservabilidad

Capítulo 23: Monitoreo y Métricas

La observabilidad es la capacidad de entender el estado interno de un sistema a través de sus outputs externos. En sistemas CQRS, donde comandos, eventos y queries fluyen asíncronamente, el monitoreo es esencial para detectar problemas y optimizar rendimiento.

Los Tres Pilares de la Observabilidad

En CQRS, monitoreamos el Write Side, Read Side y proyecciones por separado.

Métricas del Command Side

Prometheus es un sistema de monitoreo que recolecta métricas via HTTP scraping. Las aplicaciones exponen métricas en un endpoint /metrics.

Counter es una métrica que solo incrementa (total de comandos, errores). Histogram registra distribuciones de valores (latencias).

labelNames permiten segmentar métricas por dimensiones (tipo de comando, status).

// src/infrastructure/metrics/command-metrics.ts
import { Counter, Histogram, Registry } from 'prom-client';

export class CommandMetrics {
  private commandsTotal: Counter;
  private commandDuration: Histogram;
  private commandErrors: Counter;

  constructor(registry: Registry) {
    this.commandsTotal = new Counter({
      name: 'cqrs_commands_total',
      help: 'Total commands processed',
      labelNames: ['command_type', 'status'],
      registers: [registry]
    });

    this.commandDuration = new Histogram({
      name: 'cqrs_command_duration_seconds',
      help: 'Command processing duration',
      labelNames: ['command_type'],
      buckets: [0.01, 0.05, 0.1, 0.5, 1, 5],
      registers: [registry]
    });

    this.commandErrors = new Counter({
      name: 'cqrs_command_errors_total',
      help: 'Total command errors',
      labelNames: ['command_type', 'error_type'],
      registers: [registry]
    });
  }

  recordCommand(type: string, durationMs: number, success: boolean): void {
    this.commandsTotal.inc({ command_type: type, status: success ? 'success' : 'failure' });
    this.commandDuration.observe({ command_type: type }, durationMs / 1000);
  }

  recordError(type: string, errorType: string): void {
    this.commandErrors.inc({ command_type: type, error_type: errorType });
  }
}

Métricas del Query Side

El Read Side tiene métricas diferentes: interesa especialmente la tasa de aciertos de caché (cache hit rate), que indica qué porcentaje de queries se sirven desde caché.

Una tasa baja indica que el caché no está siendo efectivo (TTL muy corto, invalidación excesiva).

// src/infrastructure/metrics/query-metrics.ts
export class QueryMetrics {
  private queriesTotal: Counter;
  private queryDuration: Histogram;
  private cacheHits: Counter;
  private cacheMisses: Counter;

  constructor(registry: Registry) {
    this.queriesTotal = new Counter({
      name: 'cqrs_queries_total',
      help: 'Total queries processed',
      labelNames: ['query_type'],
      registers: [registry]
    });

    this.queryDuration = new Histogram({
      name: 'cqrs_query_duration_seconds',
      help: 'Query processing duration',
      labelNames: ['query_type', 'cache_status'],
      buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5],
      registers: [registry]
    });

    this.cacheHits = new Counter({
      name: 'cqrs_cache_hits_total',
      help: 'Cache hits',
      labelNames: ['query_type'],
      registers: [registry]
    });

    this.cacheMisses = new Counter({
      name: 'cqrs_cache_misses_total',
      help: 'Cache misses',
      labelNames: ['query_type'],
      registers: [registry]
    });
  }

  recordCacheHit(queryType: string): void {
    this.cacheHits.inc({ query_type: queryType });
  }

  recordCacheMiss(queryType: string): void {
    this.cacheMisses.inc({ query_type: queryType });
  }
}

Métricas de Proyecciones

El projection lag (retraso de proyección) es crítico: indica cuántos eventos faltan por procesar. Un lag creciente significa que las proyecciones no pueden seguir el ritmo del Write Side.

Gauge es una métrica que puede subir y bajar (a diferencia de Counter que solo sube). Ideal para valores que fluctuan como lag o conexiones activas.

// src/infrastructure/metrics/projection-metrics.ts
export class ProjectionMetrics {
  private projectionLag: Gauge;
  private eventsProcessed: Counter;
  private projectionErrors: Counter;

  constructor(registry: Registry) {
    this.projectionLag = new Gauge({
      name: 'cqrs_projection_lag_events',
      help: 'Number of events behind',
      labelNames: ['projection_name'],
      registers: [registry]
    });

    this.eventsProcessed = new Counter({
      name: 'cqrs_projection_events_total',
      help: 'Events processed by projections',
      labelNames: ['projection_name', 'event_type'],
      registers: [registry]
    });

    this.projectionErrors = new Counter({
      name: 'cqrs_projection_errors_total',
      help: 'Projection processing errors',
      labelNames: ['projection_name'],
      registers: [registry]
    });
  }

  setLag(projectionName: string, lag: number): void {
    this.projectionLag.set({ projection_name: projectionName }, lag);
  }
}

Middleware de Instrumentación

El patrón Decorator/Wrapper envuelve el bus original para agregar instrumentación sin modificar la lógica de negocio.

Este middleware mide la duración de cada comando, registra métricas de éxito/fallo, y loguea información útil para debugging.

// src/command/instrumented-bus.ts
export class InstrumentedCommandBus implements CommandBus {
  constructor(
    private inner: CommandBus,
    private metrics: CommandMetrics,
    private logger: Logger
  ) {}

  async dispatch(command: Command): Promise<void> {
    const start = Date.now();
    const commandType = command.constructor.name;

    try {
      await this.inner.dispatch(command);
      this.metrics.recordCommand(commandType, Date.now() - start, true);
      this.logger.info({ commandType, duration: Date.now() - start }, 'Command processed');
    } catch (error) {
      this.metrics.recordCommand(commandType, Date.now() - start, false);
      this.metrics.recordError(commandType, error.name);
      this.logger.error({ commandType, error }, 'Command failed');
      throw error;
    }
  }
}

Dashboard Grafana

Grafana es una plataforma de visualización que se conecta a Prometheus para crear dashboards y alertas.

Las queries usan PromQL (Prometheus Query Language):

{
  "panels": [
    {
      "title": "Commands per Second",
      "type": "graph",
      "targets": [{
        "expr": "rate(cqrs_commands_total[5m])",
        "legendFormat": "{{command_type}}"
      }]
    },
    {
      "title": "Command Latency P99",
      "type": "graph",
      "targets": [{
        "expr": "histogram_quantile(0.99, rate(cqrs_command_duration_seconds_bucket[5m]))",
        "legendFormat": "{{command_type}}"
      }]
    },
    {
      "title": "Cache Hit Rate",
      "type": "stat",
      "targets": [{
        "expr": "sum(rate(cqrs_cache_hits_total[5m])) / (sum(rate(cqrs_cache_hits_total[5m])) + sum(rate(cqrs_cache_misses_total[5m])))"
      }]
    },
    {
      "title": "Projection Lag",
      "type": "graph",
      "targets": [{
        "expr": "cqrs_projection_lag_events",
        "legendFormat": "{{projection_name}}"
      }]
    }
  ]
}

Health Checks

Los health checks verifican que todos los componentes del sistema estén funcionando. Son usados por load balancers y orquestadores para decidir si enviar tráfico a una instancia.

Un sistema CQRS necesita verificar: base de datos de escritura, base de datos de lectura, caché, y bus de eventos.

// src/infrastructure/health/health-checker.ts
export class CQRSHealthChecker {
  constructor(
    private writeDb: Database,
    private readDb: ElasticsearchClient,
    private cache: Redis,
    private eventBus: EventBus
  ) {}

  async check(): Promise<HealthStatus> {
    const checks = await Promise.all([
      this.checkWriteDb(),
      this.checkReadDb(),
      this.checkCache(),
      this.checkEventBus()
    ]);

    return {
      status: checks.every(c => c.healthy) ? 'healthy' : 'unhealthy',
      checks
    };
  }

  private async checkWriteDb(): Promise<ComponentHealth> {
    try {
      await this.writeDb.query('SELECT 1');
      return { name: 'write-db', healthy: true };
    } catch {
      return { name: 'write-db', healthy: false };
    }
  }
}

Alertas

Las alertas notifican automáticamente cuando métricas cruzan umbrales críticos. Se definen en YAML con condiciones PromQL.

for indica cuánto tiempo debe mantenerse la condición antes de disparar la alerta (evita falsos positivos por picos momentáneos).

severity clasifica la urgencia: critical requiere acción inmediata, warning puede esperar.

# alerts.yml
groups:
  - name: cqrs-alerts
    rules:
      - alert: HighCommandLatency
        expr: histogram_quantile(0.99, rate(cqrs_command_duration_seconds_bucket[5m])) > 1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Command latency is high"

      - alert: ProjectionLagHigh
        expr: cqrs_projection_lag_events > 1000
        for: 10m
        labels:
          severity: critical
        annotations:
          summary: "Projection {{ $labels.projection_name }} is behind"

      - alert: LowCacheHitRate
        expr: |
          sum(rate(cqrs_cache_hits_total[5m])) /
          (sum(rate(cqrs_cache_hits_total[5m])) + sum(rate(cqrs_cache_misses_total[5m]))) < 0.7
        for: 15m
        labels:
          severity: warning

Resumen

Monitoreo CQRS:

Glosario

Observabilidad

Definición: Capacidad de inferir el estado interno de un sistema basándose en sus outputs externos (métricas, logs, trazas).

Por qué es importante: Permite diagnosticar problemas en producción sin acceso directo al sistema. Esencial para sistemas distribuidos donde los fallos son difíciles de reproducir.

Ejemplo práctico: Un pico en latencia de comandos + aumento de errores de DB permite inferir que la base de datos está sobrecargada, sin necesidad de conectarse a ella.


Prometheus

Definición: Sistema de monitoreo y alertas de código abierto. Recolecta métricas via HTTP scraping y las almacena en una base de datos de series temporales.

Por qué es importante: Es el estándar de facto para monitoreo en Kubernetes y microservicios. Ecosistema rico de exporters y integraciones.

Ejemplo práctico: La aplicación expone métricas en /metrics. Prometheus las scrapea cada 15 segundos y las almacena para consultas y alertas.


Counter

Definición: Tipo de métrica de Prometheus que solo puede incrementar (o resetearse a cero). Ideal para contar eventos totales.

Por qué es importante: Permite calcular tasas (requests/segundo) usando rate(). El valor absoluto muestra totales históricos.

Ejemplo práctico: cqrs_commands_total cuenta todos los comandos. rate(cqrs_commands_total[5m]) muestra comandos por segundo en los últimos 5 minutos.


Histogram

Definición: Tipo de métrica de Prometheus que registra observaciones en buckets predefinidos, permitiendo calcular percentiles y distribuciones.

Por qué es importante: Un promedio de latencia esconde outliers. Los histogramas revelan la distribución real (P50, P95, P99).

Ejemplo práctico: El percentil 99 (P99) de latencia muestra la experiencia del 1% peor de usuarios. Si P99 es 2s pero promedio es 100ms, hay un problema para algunos usuarios.


Gauge

Definición: Tipo de métrica de Prometheus que puede subir y bajar. Representa valores instantáneos como temperatura, conexiones activas, o lag.

Por qué es importante: Captura estado actual, no acumulados. Ideal para métricas que fluctuan naturalmente.

Ejemplo práctico: cqrs_projection_lag_events es un gauge porque el lag puede aumentar (eventos acumulándose) o disminuir (proyección procesando).


Grafana

Definición: Plataforma de visualización y análisis que se conecta a múltiples fuentes de datos (Prometheus, Elasticsearch, etc.) para crear dashboards y alertas.

Por qué es importante: Proporciona visualización rica de métricas. Los dashboards permiten ver el estado del sistema de un vistazo.

Ejemplo práctico: Un dashboard muestra gráficos de latencia, throughput, error rate, y lag de proyecciones, actualizándose en tiempo real.


PromQL

Definición: Prometheus Query Language. Lenguaje funcional para consultar y agregar métricas de series temporales.

Por qué es importante: Permite crear queries complejas: tasas, percentiles, agregaciones por labels, operaciones entre métricas.

Ejemplo práctico: sum(rate(http_requests_total{status="500"}[5m])) by (endpoint) muestra errores 500 por segundo agrupados por endpoint.


Health Check

Definición: Endpoint que reporta el estado de salud de un servicio, típicamente verificando conexiones a dependencias.

Por qué es importante: Load balancers y orquestadores usan health checks para decidir si enviar tráfico. Un servicio unhealthy se saca de rotación.

Ejemplo práctico: GET /health retorna {"status": "healthy", "checks": [{"name": "db", "healthy": true}]} si todo funciona, o unhealthy si algo falla.


Alerting

Definición: Sistema que evalúa condiciones sobre métricas y notifica (email, Slack, PagerDuty) cuando se cruzan umbrales.

Por qué es importante: Detecta problemas antes de que los usuarios los reporten. Permite respuesta proactiva a incidentes.

Ejemplo práctico: Si el lag de proyección supera 1000 eventos por más de 10 minutos, se envía una alerta crítica al canal de on-call.


Projection Lag

Definición: Número de eventos que una proyección tiene pendientes de procesar. La diferencia entre la posición actual del event store y la posición de la proyección.

Por qué es importante: Un lag creciente indica que el Read Side no puede seguir el ritmo del Write Side. Eventualmente causa datos obsoletos en queries.

Ejemplo práctico: Si se generan 100 eventos/segundo pero la proyección procesa 80, el lag crece 20 eventos/segundo. En una hora habrá 72,000 eventos de retraso.


← Capítulo 22: Testing | Capítulo 24: Deployment →