← Volver al listado de tecnologías

Capítulo 22: Monitoreo y Observabilidad

Por: SiempreListo
event-sourcingmonitoreoobservabilidadmétricas

Capítulo 22: Monitoreo y Observabilidad

“No puedes mejorar lo que no puedes medir”

Los Tres Pilares de la Observabilidad

Un sistema observable permite entender su comportamiento interno a través de:

  1. Métricas: Valores numéricos agregados (contadores, histogramas, gauges)
  2. Logs: Registros textuales de eventos del sistema
  3. Traces: Seguimiento de operaciones a través de múltiples servicios

Para Event Sourcing, necesitamos métricas específicas que reflejen la salud del Event Store, proyecciones, y agregados.

Métricas Clave

Event Store

Estas métricas indican la salud del almacenamiento de eventos:

MétricaDescripciónAlerta
events_appended_totalTotal de eventos escritos-
events_append_duration_msLatencia de escritura> 100ms
stream_lengthEventos por stream> 10,000 (considerar snapshots)
global_positionPosición global actual-
concurrency_errors_totalConflictos de concurrencia> 10/min (posible contención)

Proyecciones

Estas métricas indican si las proyecciones mantienen el ritmo con el Event Store:

MétricaDescripciónAlerta
projection_lagEventos pendientes de procesar> 1000 (proyección retrasada)
projection_events_processedEventos procesados total-
projection_processing_duration_msTiempo por evento> 50ms (cuello de botella)
projection_errors_totalErrores de proyección> 0 (investigar inmediatamente)

Implementación de Métricas

Usamos Prometheus como sistema de métricas (estándar de facto en Kubernetes). Las métricas se exponen en formato texto que Prometheus scrapea periódicamente.

Tipos de métricas:

// src/infrastructure/monitoring/metrics.ts
import { Registry, Counter, Histogram, Gauge } from 'prom-client';

export const registry = new Registry();

// Event Store metrics
export const eventsAppendedCounter = new Counter({
  name: 'event_store_events_appended_total',
  help: 'Total events appended to event store',
  labelNames: ['stream_type', 'event_type'],
  registers: [registry]
});

export const appendDurationHistogram = new Histogram({
  name: 'event_store_append_duration_seconds',
  help: 'Duration of append operations',
  labelNames: ['stream_type'],
  buckets: [0.01, 0.05, 0.1, 0.25, 0.5, 1],
  registers: [registry]
});

export const concurrencyErrorsCounter = new Counter({
  name: 'event_store_concurrency_errors_total',
  help: 'Total concurrency conflicts',
  labelNames: ['stream_type'],
  registers: [registry]
});

// Projection metrics
export const projectionLagGauge = new Gauge({
  name: 'projection_lag_events',
  help: 'Number of events behind global position',
  labelNames: ['projection_name'],
  registers: [registry]
});

export const projectionProcessedCounter = new Counter({
  name: 'projection_events_processed_total',
  help: 'Total events processed by projection',
  labelNames: ['projection_name', 'event_type'],
  registers: [registry]
});

// Agregados
export const aggregateLoadDuration = new Histogram({
  name: 'aggregate_load_duration_seconds',
  help: 'Duration to load and rehydrate aggregate',
  labelNames: ['aggregate_type', 'used_snapshot'],
  buckets: [0.01, 0.05, 0.1, 0.5, 1, 5],
  registers: [registry]
});

Event Store Instrumentado

// src/infrastructure/event-store/instrumented-event-store.ts
import {
  eventsAppendedCounter,
  appendDurationHistogram,
  concurrencyErrorsCounter
} from '../monitoring/metrics';

export class InstrumentedEventStore implements EventStore {
  constructor(private inner: EventStore) {}

  async append(
    streamId: string,
    events: DomainEvent[],
    expectedVersion: number
  ): Promise<AppendResult> {
    const streamType = this.extractStreamType(streamId);
    const timer = appendDurationHistogram.startTimer({ stream_type: streamType });

    try {
      const result = await this.inner.append(streamId, events, expectedVersion);

      // Registrar métricas de éxito
      events.forEach(event => {
        eventsAppendedCounter.inc({
          stream_type: streamType,
          event_type: event.eventType
        });
      });

      return result;
    } catch (error) {
      if (error instanceof ConcurrencyError) {
        concurrencyErrorsCounter.inc({ stream_type: streamType });
      }
      throw error;
    } finally {
      timer();
    }
  }

  private extractStreamType(streamId: string): string {
    return streamId.split('-')[0] ?? 'unknown';
  }

  // Delegar otros métodos...
  async readStream(streamId: string): Promise<StoredEvent[]> {
    return this.inner.readStream(streamId);
  }
}

Logging Estructurado

El logging estructurado emite logs como objetos JSON en lugar de texto plano. Esto permite:

// src/infrastructure/logging/logger.ts
import pino from 'pino';

export const logger = pino({
  level: process.env.LOG_LEVEL ?? 'info',
  formatters: {
    level: (label) => ({ level: label })
  },
  base: {
    service: 'orderflow',
    version: process.env.APP_VERSION ?? '1.0.0'
  }
});

// Logging de eventos
export function logEvent(event: DomainEvent, action: 'appended' | 'processed'): void {
  logger.info({
    event_id: event.eventId,
    event_type: event.eventType,
    aggregate_id: event.aggregateId,
    aggregate_type: event.aggregateType,
    version: event.version,
    correlation_id: event.metadata.correlationId,
    action
  }, `Event ${action}: ${event.eventType}`);
}

// Logging de comandos
export function logCommand(
  commandName: string,
  aggregateId: string,
  correlationId: string,
  duration: number,
  success: boolean
): void {
  logger.info({
    command: commandName,
    aggregate_id: aggregateId,
    correlation_id: correlationId,
    duration_ms: duration,
    success
  }, `Command executed: ${commandName}`);
}

Distributed Tracing

Distributed Tracing permite seguir una operación a través de múltiples servicios. Cada operación crea un span con:

OpenTelemetry es el estándar de facto para tracing.

// src/infrastructure/tracing/tracer.ts
import { trace, SpanKind, context } from '@opentelemetry/api';

const tracer = trace.getTracer('orderflow');

export function traceCommand<T>(
  commandName: string,
  aggregateId: string,
  fn: () => Promise<T>
): Promise<T> {
  return tracer.startActiveSpan(
    `command.${commandName}`,
    {
      kind: SpanKind.INTERNAL,
      attributes: {
        'command.name': commandName,
        'aggregate.id': aggregateId
      }
    },
    async (span) => {
      try {
        const result = await fn();
        span.setStatus({ code: 0 });
        return result;
      } catch (error) {
        span.setStatus({ code: 2, message: (error as Error).message });
        throw error;
      } finally {
        span.end();
      }
    }
  );
}

export function traceProjection<T>(
  projectionName: string,
  eventType: string,
  fn: () => Promise<T>
): Promise<T> {
  return tracer.startActiveSpan(
    `projection.${projectionName}.${eventType}`,
    {
      kind: SpanKind.CONSUMER,
      attributes: {
        'projection.name': projectionName,
        'event.type': eventType
      }
    },
    async (span) => {
      try {
        const result = await fn();
        span.setStatus({ code: 0 });
        return result;
      } catch (error) {
        span.setStatus({ code: 2, message: (error as Error).message });
        throw error;
      } finally {
        span.end();
      }
    }
  );
}

Health Checks

Los health checks permiten que orquestadores (Kubernetes) y load balancers determinen si un servicio puede recibir tráfico:

// src/api/health.ts
import { Hono } from 'hono';

interface HealthStatus {
  status: 'healthy' | 'degraded' | 'unhealthy';
  checks: Record<string, CheckResult>;
}

interface CheckResult {
  status: 'pass' | 'fail';
  message?: string;
  duration_ms?: number;
}

export function createHealthRoutes(deps: {
  eventStore: EventStore;
  db: Database;
}) {
  const app = new Hono();

  app.get('/health', async (c) => {
    const checks: Record<string, CheckResult> = {};

    // Check Event Store
    const esStart = Date.now();
    try {
      await deps.eventStore.readAll(0n, 1);
      checks.event_store = {
        status: 'pass',
        duration_ms: Date.now() - esStart
      };
    } catch (error) {
      checks.event_store = {
        status: 'fail',
        message: (error as Error).message
      };
    }

    // Check Database
    const dbStart = Date.now();
    try {
      await deps.db.execute(sql`SELECT 1`);
      checks.database = {
        status: 'pass',
        duration_ms: Date.now() - dbStart
      };
    } catch (error) {
      checks.database = {
        status: 'fail',
        message: (error as Error).message
      };
    }

    const allPassing = Object.values(checks).every(c => c.status === 'pass');
    const status: HealthStatus = {
      status: allPassing ? 'healthy' : 'unhealthy',
      checks
    };

    return c.json(status, allPassing ? 200 : 503);
  });

  app.get('/health/live', (c) => c.json({ status: 'ok' }));

  app.get('/health/ready', async (c) => {
    // Verificar que las proyecciones estén al día
    const lag = await getProjectionLag();
    if (lag > 1000) {
      return c.json({ status: 'not ready', lag }, 503);
    }
    return c.json({ status: 'ready', lag });
  });

  return app;
}

Dashboard de Métricas

// Queries útiles para Grafana/Prometheus

// Tasa de eventos por minuto
// rate(event_store_events_appended_total[1m])

// Latencia p99 de append
// histogram_quantile(0.99, rate(event_store_append_duration_seconds_bucket[5m]))

// Lag de proyecciones
// projection_lag_events

// Errores de concurrencia por minuto
// rate(event_store_concurrency_errors_total[1m])

Alertas

# alerts.yml
groups:
  - name: event-sourcing
    rules:
      - alert: HighProjectionLag
        expr: projection_lag_events > 1000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Projection lag is high"

      - alert: HighConcurrencyErrors
        expr: rate(event_store_concurrency_errors_total[5m]) > 0.1
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "High rate of concurrency conflicts"

      - alert: SlowEventAppend
        expr: histogram_quantile(0.99, rate(event_store_append_duration_seconds_bucket[5m])) > 0.5
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Event append latency is too high"

Resumen

Glosario

Observabilidad

Definición: Capacidad de entender el estado interno de un sistema a partir de sus outputs externos (métricas, logs, traces).

Por qué es importante: Permite diagnosticar problemas en producción sin acceso directo al sistema ni capacidad de reproducir el bug.

Ejemplo práctico: Un spike en projection_lag combinado con logs de “connection timeout” y trace mostrando latencia en DB permite diagnosticar problema de red.


Prometheus

Definición: Sistema de monitoreo y alertas open-source que recolecta métricas via HTTP pull (scraping) y las almacena en serie temporal.

Por qué es importante: Estándar de facto para métricas en ecosistemas cloud-native y Kubernetes.

Ejemplo práctico: Prometheus scrapea /metrics cada 15s, almacena events_appended_total, y Grafana lo visualiza como gráfico de tasa de eventos/minuto.


Counter (Métrica)

Definición: Métrica que solo puede incrementar (o resetearse a cero). Representa valores acumulativos.

Por qué es importante: Ideal para contar eventos, errores, requests. La tasa de cambio (rate()) muestra eventos/segundo.

Ejemplo práctico: events_appended_total{event_type="OrderCreated"} incrementa con cada orden; rate(...[5m]) muestra órdenes/segundo.


Histogram (Métrica)

Definición: Métrica que agrupa observaciones en buckets predefinidos, permitiendo calcular percentiles.

Por qué es importante: Captura la distribución de latencias, no solo el promedio. El p99 revela casos extremos que el promedio oculta.

Ejemplo práctico: histogram_quantile(0.99, rate(append_duration_bucket[5m])) devuelve la latencia que el 99% de appends no excede.


Gauge (Métrica)

Definición: Métrica que puede subir y bajar, representando un valor instantáneo.

Por qué es importante: Ideal para valores que fluctúan como conexiones activas, memoria usada, o projection lag.

Ejemplo práctico: projection_lag_events{projection="orders"} muestra cuántos eventos faltan procesar ahora mismo.


Structured Logging

Definición: Logs emitidos como objetos estructurados (JSON) en lugar de texto plano.

Por qué es importante: Permite búsqueda, filtrado y agregación por campos específicos en sistemas como Elasticsearch o Loki.

Ejemplo práctico: {"level":"info","event_id":"abc","event_type":"OrderCreated","correlation_id":"xyz"} permite buscar todos los logs de una transacción por correlation_id.


Distributed Tracing

Definición: Técnica para seguir una operación a través de múltiples servicios, conectando logs y métricas de cada componente.

Por qué es importante: En microservicios, una request atraviesa múltiples servicios; el trace muestra dónde está la latencia.

Ejemplo práctico: Trace de “crear orden” muestra: API (5ms) -> Agregado (2ms) -> EventStore (50ms) -> Projection (10ms), identificando que EventStore es el cuello de botella.


Span

Definición: Unidad de trabajo en distributed tracing, con nombre, timestamps, y metadata. Los spans se anidan formando un árbol.

Por qué es importante: Cada span representa una operación con su duración; la estructura jerárquica muestra qué operaciones llaman a otras.

Ejemplo práctico: Span “command.ConfirmOrder” contiene child spans “load_aggregate”, “validate”, “append_events”, cada uno con su duración.


Liveness vs Readiness

Definición: Liveness indica si el proceso está vivo (debe reiniciarse si falla); Readiness indica si puede procesar tráfico (se excluye del load balancer si falla).

Por qué es importante: Un servicio puede estar vivo pero no listo (conectando a DB, cargando cache); el tráfico no debe enviarse hasta que esté ready.

Ejemplo práctico: Servicio arranca (live), carga proyecciones durante 30s (not ready), completa carga (ready), empieza a recibir tráfico.


← Capítulo 21: Testing | Capítulo 23: Deployment →