← Volver al listado de tecnologías

Capítulo 18: Event Upcasting Avanzado

Por: SiempreListo
event-sourcingupcastingmigracióntransformación

Capítulo 18: Event Upcasting Avanzado

“El upcasting permite que el código actual funcione con eventos históricos”

Introducción

El capítulo anterior cubrió upcasting básico (agregar campos). Este capítulo aborda escenarios más complejos que requieren transformaciones estructurales significativas:

Estas operaciones son más delicadas y requieren testing exhaustivo.

Escenarios Complejos

1. Dividir un Evento en Dos

A veces un evento contiene demasiada información y queremos separar responsabilidades. El upcaster puede generar múltiples eventos a partir de uno solo.

Escenario: OrderCreated original incluía shippingAddress. Ahora queremos que la dirección sea un evento separado ShippingAddressSet para mayor flexibilidad:

// Antes: Un solo evento con todo
interface OrderCreatedOld {
  orderId: string;
  customerId: string;
  items: OrderItem[];
  shippingAddress: Address;
}

// Después: Dos eventos
interface OrderCreatedNew {
  orderId: string;
  customerId: string;
  items: OrderItem[];
}

interface ShippingAddressSet {
  orderId: string;
  address: Address;
}

// Upcaster que produce múltiples eventos
function splitOrderCreated(event: StoredEvent): StoredEvent[] {
  if (event.eventType !== 'OrderCreated') return [event];
  if (event.data.version >= 2) return [event];

  const data = event.data as OrderCreatedOld;

  return [
    {
      ...event,
      data: {
        orderId: data.orderId,
        customerId: data.customerId,
        items: data.items,
        version: 2
      }
    },
    {
      ...event,
      globalPosition: event.globalPosition,
      streamPosition: event.streamPosition + 0.5, // Posición intermedia
      eventType: 'ShippingAddressSet',
      data: {
        orderId: data.orderId,
        address: data.shippingAddress
      }
    }
  ];
}

2. Combinar Eventos

El caso inverso: dos eventos que siempre van juntos pueden consolidarse en uno. Esto requiere buffering temporal mientras esperamos el segundo evento.

Escenario: Antes teníamos ItemAddedToCart seguido de ItemPriceSet. Ahora queremos un solo CartItemAdded con el precio incluido:

// Antes: Dos eventos separados
// ItemAddedToCart + ItemPriceSet

// Después: Un evento consolidado
// CartItemAdded (con precio incluido)

class CombiningUpcaster {
  private buffer: Map<string, StoredEvent[]> = new Map();

  process(event: StoredEvent): StoredEvent | null {
    if (event.eventType === 'ItemAddedToCart') {
      // Buffering hasta encontrar el precio
      const key = `${event.data.cartId}-${event.data.productId}`;
      this.buffer.set(key, [event]);
      return null; // No emitir aún
    }

    if (event.eventType === 'ItemPriceSet') {
      const key = `${event.data.cartId}-${event.data.productId}`;
      const buffered = this.buffer.get(key);

      if (buffered) {
        this.buffer.delete(key);
        // Combinar en nuevo evento
        return {
          ...buffered[0],
          eventType: 'CartItemAdded',
          data: {
            cartId: event.data.cartId,
            productId: event.data.productId,
            quantity: buffered[0].data.quantity,
            price: event.data.price
          }
        };
      }
    }

    return event;
  }
}

3. Renombrar Tipo de Evento

El escenario más simple: el nombre del evento cambió pero la estructura es idéntica. Un simple mapeo resuelve esto:

// Mapeo de nombres antiguos a nuevos
const eventTypeMap: Record<string, string> = {
  'OrderPlaced': 'OrderCreated',
  'PaymentMade': 'PaymentReceived',
  'ItemAddedToOrder': 'OrderItemAdded'
};

function renameEventType(event: StoredEvent): StoredEvent {
  const newType = eventTypeMap[event.eventType];
  if (newType) {
    return { ...event, eventType: newType };
  }
  return event;
}

Pipeline de Upcasting

Cuando hay múltiples tipos de transformaciones, organizarlas en un pipeline con etapas definidas garantiza orden y claridad. Cada etapa procesa todos los eventos antes de pasar a la siguiente:

// src/infrastructure/event-store/upcasting-pipeline.ts
type UpcasterStage = (events: StoredEvent[]) => StoredEvent[];

export class UpcastingPipeline {
  private stages: UpcasterStage[] = [];

  addStage(stage: UpcasterStage): this {
    this.stages.push(stage);
    return this;
  }

  process(events: StoredEvent[]): StoredEvent[] {
    return this.stages.reduce(
      (current, stage) => stage(current),
      events
    );
  }
}

// Configuración del pipeline
const pipeline = new UpcastingPipeline()
  // Etapa 1: Renombrar tipos
  .addStage(events => events.map(renameEventType))
  // Etapa 2: Actualizar versiones de payload
  .addStage(events => events.map(e => ({
    ...e,
    data: upcaster.upcast(e.eventType, e.data)
  })))
  // Etapa 3: Dividir eventos complejos
  .addStage(events => events.flatMap(splitOrderCreated))
  // Etapa 4: Ordenar por posición
  .addStage(events => events.sort((a, b) =>
    a.streamPosition - b.streamPosition
  ));

Migración de Datos

Copy-Transform Pattern

Para migraciones grandes o cuando el upcasting en tiempo real no es suficiente, el patrón Copy-Transform crea nuevos streams con datos transformados. Este enfoque:

async function migrateStream(
  sourceStore: EventStore,
  targetStore: EventStore,
  streamId: string,
  transformer: (events: StoredEvent[]) => StoredEvent[]
): Promise<void> {
  const sourceEvents = await sourceStore.readStream(streamId);
  const transformedEvents = transformer(sourceEvents);

  // Nuevo stream con sufijo de versión
  const targetStreamId = `${streamId}-v2`;

  // Re-numerar posiciones
  const renumbered = transformedEvents.map((event, index) => ({
    ...event,
    streamPosition: index
  }));

  // Escribir en nuevo stream
  for (const event of renumbered) {
    await targetStore.append(
      targetStreamId,
      [event],
      event.streamPosition - 1
    );
  }

  console.log(`Migrated ${streamId} -> ${targetStreamId}`);
}

// Migración masiva
async function migrateAllStreams(
  sourceStore: EventStore,
  targetStore: EventStore
): Promise<void> {
  const allEvents = await sourceStore.readAll(0n, 1000000);

  // Agrupar por stream
  const streams = new Map<string, StoredEvent[]>();
  for (const event of allEvents) {
    const existing = streams.get(event.streamId) ?? [];
    existing.push(event);
    streams.set(event.streamId, existing);
  }

  // Migrar cada stream
  for (const [streamId, events] of streams) {
    await migrateStream(
      sourceStore,
      targetStore,
      streamId,
      pipeline.process.bind(pipeline)
    );
  }
}

Testing de Migraciones

Los tests de migración son críticos: un error puede corromper datos irrecuperablemente. Cada escenario debe testearse con datos reales o muy similares:

describe('Event Migration', () => {
  describe('OrderCreated V1 to V2', () => {
    it('should add customer email with default', () => {
      const v1Event: StoredEvent = {
        globalPosition: 1n,
        streamId: 'order-123',
        streamPosition: 0,
        eventType: 'OrderCreated',
        data: {
          orderId: 'order-123',
          customerId: 'cust-456',
          items: []
        },
        metadata: {},
        createdAt: new Date()
      };

      const result = pipeline.process([v1Event]);

      expect(result[0].data.customerEmail).toBe('[email protected]');
      expect(result[0].data.version).toBe(2);
    });
  });

  describe('Event splitting', () => {
    it('should split OrderCreated with address into two events', () => {
      const oldEvent: StoredEvent = {
        globalPosition: 1n,
        streamId: 'order-123',
        streamPosition: 0,
        eventType: 'OrderCreated',
        data: {
          orderId: 'order-123',
          customerId: 'cust-456',
          items: [],
          shippingAddress: {
            street: '123 Main St',
            city: 'Springfield'
          }
        },
        metadata: {},
        createdAt: new Date()
      };

      const result = pipeline.process([oldEvent]);

      expect(result).toHaveLength(2);
      expect(result[0].eventType).toBe('OrderCreated');
      expect(result[1].eventType).toBe('ShippingAddressSet');
    });
  });

  describe('Full migration', () => {
    it('should migrate stream preserving order', async () => {
      const source = new InMemoryEventStore();
      const target = new InMemoryEventStore();

      // Setup source data
      await source.append('order-1', [
        createOldOrderCreated(),
        createOldItemAdded()
      ], -1);

      // Migrate
      await migrateStream(source, target, 'order-1', pipeline.process.bind(pipeline));

      // Verify
      const migrated = await target.readStream('order-1-v2');
      expect(migrated.length).toBeGreaterThan(0);
    });
  });
});

Monitoreo de Upcasting

class UpcastingMetrics {
  private upcastCounts: Map<string, number> = new Map();
  private upcastTimes: Map<string, number[]> = new Map();

  recordUpcast(eventType: string, durationMs: number): void {
    const count = this.upcastCounts.get(eventType) ?? 0;
    this.upcastCounts.set(eventType, count + 1);

    const times = this.upcastTimes.get(eventType) ?? [];
    times.push(durationMs);
    this.upcastTimes.set(eventType, times);
  }

  getReport(): Record<string, { count: number; avgTime: number }> {
    const report: Record<string, { count: number; avgTime: number }> = {};

    for (const [type, count] of this.upcastCounts) {
      const times = this.upcastTimes.get(type) ?? [];
      const avgTime = times.reduce((a, b) => a + b, 0) / times.length;
      report[type] = { count, avgTime };
    }

    return report;
  }
}

Resumen

Glosario

Pipeline de Transformación

Definición: Secuencia de etapas que procesan datos en orden, donde la salida de una etapa es la entrada de la siguiente.

Por qué es importante: Organiza transformaciones complejas de forma mantenible; cada etapa tiene una responsabilidad clara.

Ejemplo práctico: Etapa 1 renombra tipos -> Etapa 2 actualiza payloads -> Etapa 3 divide eventos -> Etapa 4 reordena.


Event Splitting

Definición: Transformación donde un evento antiguo se convierte en múltiples eventos durante el upcasting.

Por qué es importante: Permite corregir decisiones de diseño donde se agrupó demasiada información en un solo evento.

Ejemplo práctico: OrderCreated con dirección se divide en OrderCreated + ShippingAddressSet, permitiendo que la dirección se modifique independientemente.


Event Combining

Definición: Transformación donde múltiples eventos se fusionan en uno solo durante el upcasting.

Por qué es importante: Simplifica el modelo cuando eventos siempre ocurren juntos y no tienen sentido por separado.

Ejemplo práctico: ItemAddedToCart + ItemPriceSet se combinan en CartItemAdded con precio incluido.


Buffering en Upcasting

Definición: Almacenamiento temporal de eventos mientras esperamos eventos relacionados para combinarlos.

Por qué es importante: La combinación de eventos requiere tener acceso a ambos; el buffer mantiene el primero hasta encontrar el segundo.

Ejemplo práctico: Map<key, Event[]> guarda ItemAddedToCart indexado por cartId+productId; cuando llega ItemPriceSet con misma key, se combinan.


Copy-Transform Pattern

Definición: Patrón de migración donde se crean nuevos streams con datos transformados en lugar de modificar los originales.

Por qué es importante: Mantiene datos originales para rollback, permite migración gradual, y evita modificar el log inmutable.

Ejemplo práctico: Stream order-123 se transforma a order-123-v2; cuando la migración se valida, el código lee de la versión nueva.


Stream Renumbering

Definición: Proceso de reasignar posiciones consecutivas (0, 1, 2…) a eventos durante migración cuando la transformación cambió la cantidad.

Por qué es importante: Después de dividir o combinar eventos, las posiciones originales ya no son consecutivas; debemos renumerarlas.

Ejemplo práctico: Después de dividir un evento en posición 5 en dos, el stream tiene posiciones …, 4, 5, 5.5, 6… que se renumeran a …, 4, 5, 6, 7…


Upcasting Metrics

Definición: Mediciones de rendimiento y uso del sistema de upcasting: conteo por tipo de evento, tiempo de transformación, etc.

Por qué es importante: Identifica cuellos de botella y eventos problemáticos que requieren optimización o migración definitiva.

Ejemplo práctico: Si OrderCreated V1->V2 toma 50ms promedio (muy alto), considerar migrar esos eventos permanentemente.


Idempotent Migration

Definición: Migración que puede ejecutarse múltiples veces con el mismo resultado, sin crear duplicados ni corrupción.

Por qué es importante: Si la migración falla a mitad, puede reiniciarse sin efectos adversos.

Ejemplo práctico: Verificar si order-123-v2 existe antes de crearlo; si existe, saltar a la siguiente orden.


← Capítulo 17: Versionado | Capítulo 19: Frontend React →