← Volver al listado de tecnologías

Capítulo 8: Event Store con PostgreSQL

Por: SiempreListo
event-sourcingpostgresqldrizzleevent-store

Capítulo 8: Event Store con PostgreSQL

“PostgreSQL es una excelente opción para Event Stores de escala media”

Interface del Event Store

Antes de la implementación, definimos una interfaz (contrato) que describe las operaciones del Event Store. Esto permite:

  1. Tener múltiples implementaciones (PostgreSQL, in-memory para tests)
  2. El dominio depende de la interfaz, no de la implementación
  3. Fácil cambio de tecnología en el futuro

Las operaciones fundamentales son:

// src/infrastructure/event-store/types.ts
import { DomainEvent } from '@domain/events/base';

export interface StoredEvent {
  globalPosition: bigint;
  streamId: string;
  streamPosition: number;
  eventType: string;
  data: unknown;
  metadata: unknown;
  createdAt: Date;
}

export interface AppendResult {
  nextExpectedVersion: number;
  globalPosition: bigint;
  eventsAppended: number;
}

export interface ReadOptions {
  fromVersion?: number;
  maxCount?: number;
}

export interface SubscriptionOptions {
  fromPosition?: bigint;
  batchSize?: number;
}

export class ConcurrencyError extends Error {
  constructor(
    public streamId: string,
    public expectedVersion: number,
    public actualVersion: number
  ) {
    super(
      `Concurrency conflict on stream ${streamId}: ` +
      `expected version ${expectedVersion}, actual ${actualVersion}`
    );
    this.name = 'ConcurrencyError';
  }
}

export class StreamNotFoundError extends Error {
  constructor(public streamId: string) {
    super(`Stream not found: ${streamId}`);
    this.name = 'StreamNotFoundError';
  }
}

export interface EventStore {
  append(
    streamId: string,
    events: DomainEvent[],
    expectedVersion: number
  ): Promise<AppendResult>;

  readStream(streamId: string, options?: ReadOptions): Promise<StoredEvent[]>;

  readAll(fromPosition?: bigint, maxCount?: number): Promise<StoredEvent[]>;

  getStreamVersion(streamId: string): Promise<number>;

  streamExists(streamId: string): Promise<boolean>;
}

Implementacion PostgreSQL

La implementación usa varios patrones importantes:

// src/infrastructure/event-store/postgres-event-store.ts
import { eq, and, gte, sql, desc } from 'drizzle-orm';
import { Database } from '../database/connection';
import { events } from '../database/schema';
import { DomainEvent } from '@domain/events/base';
import {
  EventStore,
  StoredEvent,
  AppendResult,
  ReadOptions,
  ConcurrencyError
} from './types';

export class PostgresEventStore implements EventStore {
  constructor(private db: Database) {}

  async append(
    streamId: string,
    domainEvents: DomainEvent[],
    expectedVersion: number
  ): Promise<AppendResult> {
    if (domainEvents.length === 0) {
      return {
        nextExpectedVersion: expectedVersion,
        globalPosition: 0n,
        eventsAppended: 0
      };
    }

    return await this.db.transaction(async (tx) => {
      // Check current version with lock
      const [currentVersion] = await tx
        .select({ maxPosition: sql<number>`COALESCE(MAX(stream_position), -1)` })
        .from(events)
        .where(eq(events.streamId, streamId))
        .for('update');

      const actualVersion = currentVersion?.maxPosition ?? -1;

      if (actualVersion !== expectedVersion) {
        throw new ConcurrencyError(streamId, expectedVersion, actualVersion);
      }

      // Insert events
      const toInsert = domainEvents.map((event, index) => ({
        streamId,
        streamPosition: expectedVersion + 1 + index,
        eventType: event.eventType,
        data: event.payload,
        metadata: event.metadata
      }));

      const inserted = await tx
        .insert(events)
        .values(toInsert)
        .returning({ globalPosition: events.globalPosition });

      const lastPosition = inserted[inserted.length - 1].globalPosition;

      return {
        nextExpectedVersion: expectedVersion + domainEvents.length,
        globalPosition: BigInt(lastPosition),
        eventsAppended: domainEvents.length
      };
    });
  }

  async readStream(streamId: string, options?: ReadOptions): Promise<StoredEvent[]> {
    let query = this.db
      .select()
      .from(events)
      .where(eq(events.streamId, streamId))
      .orderBy(events.streamPosition);

    if (options?.fromVersion !== undefined) {
      query = this.db
        .select()
        .from(events)
        .where(and(
          eq(events.streamId, streamId),
          gte(events.streamPosition, options.fromVersion)
        ))
        .orderBy(events.streamPosition);
    }

    if (options?.maxCount) {
      query = query.limit(options.maxCount);
    }

    const rows = await query;

    return rows.map(row => ({
      globalPosition: BigInt(row.globalPosition),
      streamId: row.streamId,
      streamPosition: row.streamPosition,
      eventType: row.eventType,
      data: row.data,
      metadata: row.metadata,
      createdAt: row.createdAt ?? new Date()
    }));
  }

  async readAll(fromPosition?: bigint, maxCount = 1000): Promise<StoredEvent[]> {
    let query = this.db
      .select()
      .from(events)
      .orderBy(events.globalPosition)
      .limit(maxCount);

    if (fromPosition !== undefined) {
      query = this.db
        .select()
        .from(events)
        .where(gte(events.globalPosition, fromPosition.toString()))
        .orderBy(events.globalPosition)
        .limit(maxCount);
    }

    const rows = await query;

    return rows.map(row => ({
      globalPosition: BigInt(row.globalPosition),
      streamId: row.streamId,
      streamPosition: row.streamPosition,
      eventType: row.eventType,
      data: row.data,
      metadata: row.metadata,
      createdAt: row.createdAt ?? new Date()
    }));
  }

  async getStreamVersion(streamId: string): Promise<number> {
    const [result] = await this.db
      .select({ maxPosition: sql<number>`COALESCE(MAX(stream_position), -1)` })
      .from(events)
      .where(eq(events.streamId, streamId));

    return result?.maxPosition ?? -1;
  }

  async streamExists(streamId: string): Promise<boolean> {
    const version = await this.getStreamVersion(streamId);
    return version >= 0;
  }
}

Implementacion In-Memory (Testing)

Para tests unitarios, usar una base de datos real es lento y complejo. La implementación in-memory:

El método clear() reinicia el estado entre tests.

// src/infrastructure/event-store/in-memory-event-store.ts
import { DomainEvent } from '@domain/events/base';
import {
  EventStore,
  StoredEvent,
  AppendResult,
  ReadOptions,
  ConcurrencyError
} from './types';

export class InMemoryEventStore implements EventStore {
  private events: StoredEvent[] = [];
  private globalPosition = 0n;

  async append(
    streamId: string,
    domainEvents: DomainEvent[],
    expectedVersion: number
  ): Promise<AppendResult> {
    const streamEvents = this.events.filter(e => e.streamId === streamId);
    const actualVersion = streamEvents.length > 0
      ? Math.max(...streamEvents.map(e => e.streamPosition))
      : -1;

    if (actualVersion !== expectedVersion) {
      throw new ConcurrencyError(streamId, expectedVersion, actualVersion);
    }

    for (let i = 0; i < domainEvents.length; i++) {
      const event = domainEvents[i];
      this.globalPosition++;

      this.events.push({
        globalPosition: this.globalPosition,
        streamId,
        streamPosition: expectedVersion + 1 + i,
        eventType: event.eventType,
        data: event.payload,
        metadata: event.metadata,
        createdAt: new Date()
      });
    }

    return {
      nextExpectedVersion: expectedVersion + domainEvents.length,
      globalPosition: this.globalPosition,
      eventsAppended: domainEvents.length
    };
  }

  async readStream(streamId: string, options?: ReadOptions): Promise<StoredEvent[]> {
    let result = this.events
      .filter(e => e.streamId === streamId)
      .sort((a, b) => a.streamPosition - b.streamPosition);

    if (options?.fromVersion !== undefined) {
      result = result.filter(e => e.streamPosition >= options.fromVersion!);
    }

    if (options?.maxCount) {
      result = result.slice(0, options.maxCount);
    }

    return result;
  }

  async readAll(fromPosition?: bigint, maxCount = 1000): Promise<StoredEvent[]> {
    let result = [...this.events].sort(
      (a, b) => Number(a.globalPosition - b.globalPosition)
    );

    if (fromPosition !== undefined) {
      result = result.filter(e => e.globalPosition >= fromPosition);
    }

    return result.slice(0, maxCount);
  }

  async getStreamVersion(streamId: string): Promise<number> {
    const streamEvents = this.events.filter(e => e.streamId === streamId);
    if (streamEvents.length === 0) return -1;
    return Math.max(...streamEvents.map(e => e.streamPosition));
  }

  async streamExists(streamId: string): Promise<boolean> {
    return this.events.some(e => e.streamId === streamId);
  }

  // Testing helpers
  clear(): void {
    this.events = [];
    this.globalPosition = 0n;
  }

  getAll(): StoredEvent[] {
    return [...this.events];
  }
}

Testing del Event Store

// src/infrastructure/event-store/postgres-event-store.test.ts
import { describe, it, expect, beforeEach } from 'vitest';
import { InMemoryEventStore } from './in-memory-event-store';
import { ConcurrencyError } from './types';
import { createEvent } from '@domain/events/base';

describe('EventStore', () => {
  let store: InMemoryEventStore;

  beforeEach(() => {
    store = new InMemoryEventStore();
  });

  describe('append', () => {
    it('should append events to new stream', async () => {
      const event = createEvent('TestEvent', 'stream-1', 'Test', 0, { data: 'test' });

      const result = await store.append('stream-1', [event], -1);

      expect(result.eventsAppended).toBe(1);
      expect(result.nextExpectedVersion).toBe(0);
    });

    it('should append multiple events', async () => {
      const events = [
        createEvent('Event1', 'stream-1', 'Test', 0, { n: 1 }),
        createEvent('Event2', 'stream-1', 'Test', 1, { n: 2 }),
        createEvent('Event3', 'stream-1', 'Test', 2, { n: 3 })
      ];

      const result = await store.append('stream-1', events, -1);

      expect(result.eventsAppended).toBe(3);
      expect(result.nextExpectedVersion).toBe(2);
    });

    it('should throw ConcurrencyError on version mismatch', async () => {
      const event1 = createEvent('Event1', 'stream-1', 'Test', 0, {});
      await store.append('stream-1', [event1], -1);

      const event2 = createEvent('Event2', 'stream-1', 'Test', 1, {});

      await expect(store.append('stream-1', [event2], -1))
        .rejects.toThrow(ConcurrencyError);
    });
  });

  describe('readStream', () => {
    it('should read all events from stream', async () => {
      const events = [
        createEvent('Event1', 'stream-1', 'Test', 0, { n: 1 }),
        createEvent('Event2', 'stream-1', 'Test', 1, { n: 2 })
      ];
      await store.append('stream-1', events, -1);

      const stored = await store.readStream('stream-1');

      expect(stored).toHaveLength(2);
      expect(stored[0].eventType).toBe('Event1');
      expect(stored[1].eventType).toBe('Event2');
    });

    it('should read from specific version', async () => {
      const events = [
        createEvent('Event1', 'stream-1', 'Test', 0, {}),
        createEvent('Event2', 'stream-1', 'Test', 1, {}),
        createEvent('Event3', 'stream-1', 'Test', 2, {})
      ];
      await store.append('stream-1', events, -1);

      const stored = await store.readStream('stream-1', { fromVersion: 1 });

      expect(stored).toHaveLength(2);
      expect(stored[0].streamPosition).toBe(1);
    });

    it('should return empty array for non-existent stream', async () => {
      const stored = await store.readStream('non-existent');
      expect(stored).toHaveLength(0);
    });
  });

  describe('readAll', () => {
    it('should read events across all streams in order', async () => {
      await store.append('stream-1', [
        createEvent('A1', 'stream-1', 'Test', 0, {})
      ], -1);
      await store.append('stream-2', [
        createEvent('B1', 'stream-2', 'Test', 0, {})
      ], -1);
      await store.append('stream-1', [
        createEvent('A2', 'stream-1', 'Test', 1, {})
      ], 0);

      const all = await store.readAll();

      expect(all).toHaveLength(3);
      expect(all[0].eventType).toBe('A1');
      expect(all[1].eventType).toBe('B1');
      expect(all[2].eventType).toBe('A2');
    });
  });

  describe('getStreamVersion', () => {
    it('should return -1 for new stream', async () => {
      const version = await store.getStreamVersion('new-stream');
      expect(version).toBe(-1);
    });

    it('should return latest version', async () => {
      await store.append('stream-1', [
        createEvent('E1', 'stream-1', 'Test', 0, {}),
        createEvent('E2', 'stream-1', 'Test', 1, {})
      ], -1);

      const version = await store.getStreamVersion('stream-1');
      expect(version).toBe(1);
    });
  });
});

Resumen

Glosario

Interfaz (Interface)

Definicion: Contrato que define las operaciones que una clase debe implementar, sin especificar como implementarlas.

Por que es importante: Permite multiples implementaciones intercambiables. El codigo que usa el Event Store no sabe si es PostgreSQL, MongoDB, o in-memory.

Ejemplo practico: interface EventStore { append(...), readStream(...) }. Tanto PostgresEventStore como InMemoryEventStore implementan esta interfaz.


Transaccion de Base de Datos

Definicion: Grupo de operaciones que se ejecutan como una unidad atomica: todas tienen exito o todas fallan.

Por que es importante: Garantiza consistencia. Si verificas version y luego insertas, ambas operaciones deben ser atomicas para evitar race conditions.

Ejemplo practico: db.transaction(async (tx) => { checkVersion(); insertEvents(); }). Si insertEvents falla, checkVersion tambien se revierte.


FOR UPDATE (Row Locking)

Definicion: Clausula SQL que bloquea las filas leidas hasta que la transaccion termine, previniendo modificaciones concurrentes.

Por que es importante: Previene race conditions al verificar version. Sin bloqueo, dos transacciones podrian leer la misma version y ambas intentar escribir.

Ejemplo practico: SELECT MAX(version) FROM events WHERE stream_id = ? FOR UPDATE bloquea el stream mientras verificas y escribes.


JSONB (PostgreSQL)

Definicion: Tipo de datos de PostgreSQL para almacenar JSON de forma binaria optimizada, con soporte para consultas e indices.

Por que es importante: Permite guardar eventos con diferentes estructuras en la misma columna, manteniendo capacidad de consulta.

Ejemplo practico: data JSONB NOT NULL almacena el payload del evento. Puedes consultar WHERE data->>'customerId' = '123'.


BIGSERIAL

Definicion: Tipo de PostgreSQL que genera automaticamente enteros de 64 bits secuenciales (1, 2, 3…).

Por que es importante: La posicion global necesita ser unica y ordenada. BIGSERIAL garantiza secuencialidad sin coordinacion manual.

Ejemplo practico: global_position BIGSERIAL PRIMARY KEY genera automaticamente 1, 2, 3… para cada nuevo evento insertado.


ConcurrencyError

Definicion: Error que indica que el agregado fue modificado por otro proceso entre la lectura y la escritura.

Por que es importante: Es parte fundamental del control de concurrencia optimista. Indica que debes reintentar la operacion.

Ejemplo practico: Lees pedido (version 5), otro usuario confirma (version 6), intentas modificar esperando version 5, recibes ConcurrencyError.


Test Double (In-Memory Implementation)

Definicion: Implementacion simplificada usada en tests en lugar del componente real (base de datos, API externa).

Por que es importante: Los tests deben ser rapidos, aislados, y deterministas. Usar una base de datos real viola estos principios.

Ejemplo practico: InMemoryEventStore almacena eventos en un array de JavaScript. Los tests corren en milisegundos sin necesidad de PostgreSQL.


Indice de Base de Datos

Definicion: Estructura de datos que mejora la velocidad de consultas a cambio de espacio adicional y escrituras mas lentas.

Por que es importante: Sin indices, cada consulta escanea toda la tabla. Con indices apropiados, las consultas frecuentes son rapidas.

Ejemplo practico: CREATE INDEX idx_stream_id ON events(stream_id) permite encontrar eventos de un stream especifico sin escanear millones de filas.


UNIQUE Constraint

Definicion: Restriccion de base de datos que previene valores duplicados en una columna o combinacion de columnas.

Por que es importante: UNIQUE(stream_id, stream_position) garantiza que no puedes tener dos eventos con la misma posicion en un stream.

Ejemplo practico: Si intentas insertar un evento con posicion 5 en un stream que ya tiene posicion 5, la base de datos rechaza la operacion.


← Capítulo 7: Modelando Eventos | Capítulo 9: Agregado Order →