← Volver al listado de tecnologías

Caché con Redis

Por: SiempreListo
cqrsrediscacheperformancetypescript

Capítulo 14: Caché con Redis

Redis es una base de datos en memoria que funciona como caché de alta velocidad para el Read Side de CQRS. Al almacenar datos frecuentemente consultados en memoria, reducimos la latencia de consultas de milisegundos a microsegundos.

Por qué Redis en CQRS

En un sistema CQRS, el Read Side puede recibir miles de consultas por segundo. Aunque Elasticsearch es rápido, acceder a memoria es ordenes de magnitud más rápido que acceder a disco o red.

Redis actua como primera línea de defensa:

  1. La consulta llega al sistema
  2. Se busca en Redis (microsegundos)
  3. Si no está, se busca en Elasticsearch (milisegundos)
  4. Se guarda en Redis para futuras consultas

Configuración de Redis

ioredis es el cliente Redis más popular para Node.js. Ofrece soporte para clustering, pipelining y reconexión automática.

// src/infrastructure/redis/client.ts
import { Redis } from 'ioredis';

export const createRedisClient = (): Redis => {
  return new Redis({
    host: process.env.REDIS_HOST ?? 'localhost',
    port: Number(process.env.REDIS_PORT) ?? 6379,
    maxRetriesPerRequest: 3,
    retryDelayOnFailover: 100
  });
};

Read Model con Caché

El patrón Cache-Aside (también llamado Lazy Loading) funciona así:

  1. Intentar leer del caché
  2. Si hay hit, retornar inmediatamente
  3. Si hay miss, leer de la fuente principal
  4. Guardar en caché para próximas lecturas

TTL (Time To Live) define cuántos segundos vive un dato en caché antes de expirar automáticamente.

// src/read/cached-order.repository.ts
export class CachedOrderReadRepository {
  private readonly TTL = 3600;

  constructor(
    private redis: Redis,
    private elastic: ElasticsearchOrderRepository
  ) {}

  async findById(orderId: string): Promise<OrderReadModel | null> {
    const cached = await this.redis.get(`order:${orderId}`);
    if (cached) return JSON.parse(cached);

    const order = await this.elastic.findById(orderId);
    if (order) {
      await this.redis.setex(`order:${orderId}`, this.TTL, JSON.stringify(order));
    }
    return order;
  }

  async invalidate(orderId: string): Promise<void> {
    await this.redis.del(`order:${orderId}`);
  }
}

Caché de Listados con Sorted Sets

Los Sorted Sets de Redis son estructuras ideales para listados ordenados. Cada elemento tiene un “score” (puntuación) que determina su orden.

Usamos el timestamp como score para ordenar pedidos cronológicamente.

Pipeline agrupa múltiples comandos en una sola llamada de red, reduciendo la latencia total.

// src/read/order-list.cache.ts
export class OrderListCache {
  constructor(private redis: Redis) {}

  async cacheOrdersByCustomer(
    customerId: string,
    orders: OrderSummary[]
  ): Promise<void> {
    const key = `customer:${customerId}:orders`;
    const pipeline = this.redis.pipeline();

    pipeline.del(key);
    for (const order of orders) {
      pipeline.zadd(key, order.createdAt.getTime(), JSON.stringify(order));
    }
    pipeline.expire(key, 1800);

    await pipeline.exec();
  }

  async getRecentOrders(customerId: string, limit: number): Promise<OrderSummary[]> {
    const key = `customer:${customerId}:orders`;
    const results = await this.redis.zrevrange(key, 0, limit - 1);
    return results.map(r => JSON.parse(r));
  }
}

Invalidación por Eventos

En CQRS, el caché debe invalidarse cuando el Write Side genera eventos que modifican los datos. Creamos una proyección dedicada que escucha eventos y elimina las entradas de caché afectadas.

El método keys() con patrones wildcard (*) permite encontrar todas las claves que coinciden, util para invalidar datos relacionados.

// src/projections/cache-invalidation.projection.ts
export class CacheInvalidationProjection implements Projection<void> {
  name = 'cache-invalidation';

  constructor(private redis: Redis) {}

  async handle(event: DomainEvent): Promise<void> {
    const patterns = this.getInvalidationPatterns(event);
    if (patterns.length > 0) {
      await this.invalidatePatterns(patterns);
    }
  }

  private getInvalidationPatterns(event: DomainEvent): string[] {
    switch (event.type) {
      case 'OrderCreated':
      case 'OrderUpdated':
        return [
          `order:${event.aggregateId}`,
          `customer:${event.data.customerId}:orders`
        ];
      case 'ProductPriceChanged':
        return [`product:${event.aggregateId}:*`];
      default:
        return [];
    }
  }

  private async invalidatePatterns(patterns: string[]): Promise<void> {
    for (const pattern of patterns) {
      if (pattern.includes('*')) {
        const keys = await this.redis.keys(pattern);
        if (keys.length) await this.redis.del(...keys);
      } else {
        await this.redis.del(pattern);
      }
    }
  }
}

Query Handler con Caché

El Query Handler usa el repositorio cacheado de forma transparente. No necesita saber si el dato viene de caché o de Elasticsearch.

// src/read/handlers/get-order.handler.ts
export class GetOrderQueryHandler {
  constructor(private cache: CachedOrderReadRepository) {}

  async handle(query: GetOrderQuery): Promise<OrderReadModel> {
    const order = await this.cache.findById(query.orderId);
    if (!order) throw new OrderNotFoundError(query.orderId);
    return order;
  }
}

Estadísticas en Tiempo Real

Redis es ideal para contadores y estadísticas que cambian frecuentemente. Los comandos INCR e INCRBYFLOAT son atómicos y extremadamente rápidos.

Usamos la fecha como parte de la clave para segmentar estadísticas por día.

// src/read/realtime-stats.cache.ts
export class RealtimeStatsCache {
  constructor(private redis: Redis) {}

  async incrementOrderCount(): Promise<void> {
    const today = new Date().toISOString().split('T')[0];
    await this.redis.incr(`stats:orders:${today}`);
  }

  async addToRevenue(amount: number): Promise<void> {
    const today = new Date().toISOString().split('T')[0];
    await this.redis.incrbyfloat(`stats:revenue:${today}`, amount);
  }

  async getDailyStats(date: string): Promise<DailyStats> {
    const [orders, revenue] = await Promise.all([
      this.redis.get(`stats:orders:${date}`),
      this.redis.get(`stats:revenue:${date}`)
    ]);
    return {
      orderCount: Number(orders) || 0,
      revenue: Number(revenue) || 0
    };
  }
}

Resumen

Redis en CQRS permite:

Glosario

Redis

Definición: Base de datos en memoria de código abierto que funciona como almacén de estructuras de datos (strings, hashes, lists, sets, sorted sets).

Por qué es importante: Ofrece latencias de microsegundos para lecturas, ideal como capa de caché frente a bases de datos más lentas.

Ejemplo práctico: Almacenar el detalle de un pedido consultado frecuentemente para servirlo en 0.1ms en lugar de 10ms desde Elasticsearch.


TTL (Time To Live)

Definición: Tiempo en segundos que un dato permanece válido en caché antes de ser eliminado automáticamente.

Por qué es importante: Garantiza que los datos en caché eventualmente se refresquen, evitando servir datos obsoletos indefinidamente.

Ejemplo práctico: Un TTL de 3600 significa que el dato expira en 1 hora. Si el usuario consulta después, se obtiene un dato fresco de la fuente.


Cache-Aside (Lazy Loading)

Definición: Patrón donde la aplicación primero consulta el caché; si no encuentra el dato, lo obtiene de la fuente principal y lo guarda en caché.

Por qué es importante: Solo se cachean los datos que realmente se consultan, optimizando el uso de memoria.

Ejemplo práctico: Al consultar un pedido, si no está en Redis, se obtiene de Elasticsearch y se guarda en Redis para la próxima consulta.


Sorted Set

Definición: Estructura de datos de Redis que almacena elementos únicos ordenados por un valor numérico llamado “score”.

Por qué es importante: Permite obtener rangos ordenados eficientemente, ideal para listados paginados y rankings.

Ejemplo práctico: Almacenar pedidos de un cliente usando el timestamp como score para obtener “los 10 pedidos más recientes” instantáneamente.


Pipeline

Definición: Técnica que agrupa múltiples comandos Redis en una sola llamada de red, ejecutándolos secuencialmente en el servidor.

Por qué es importante: Reduce drásticamente la latencia al eliminar el round-trip de red entre comandos individuales.

Ejemplo práctico: Enviar 100 comandos SET en un pipeline toma casi lo mismo que enviar 1, en lugar de 100 veces más.


Invalidación de Caché

Definición: Proceso de eliminar o marcar como obsoletos los datos en caché cuando la fuente de verdad cambia.

Por qué es importante: Garantiza consistencia entre el caché y los datos reales. Datos stale (obsoletos) pueden causar errores de negocio.

Ejemplo práctico: Cuando un pedido se actualiza, eliminamos order:123 del caché para que la próxima lectura obtenga el dato actualizado.


Operaciones Atómicas

Definición: Operaciones que se ejecutan completamente o no se ejecutan en absoluto, sin estados intermedios visibles.

Por qué es importante: Garantiza consistencia en operaciones concurrentes. Múltiples incrementos simultáneos no perderán datos.

Ejemplo práctico: INCR stats:orders:2024-01-15 incrementa el contador de forma segura incluso con miles de requests concurrentes.


← Capítulo 13: Proyecciones | Capítulo 15: Go Arquitectura →