← Volver al listado de tecnologías

Manejo de Consistencia Eventual

Por: SiempreListo
cqrsconsistencia-eventualarquitecturatypescriptgopython

Manejo de Consistencia Eventual

En este capitulo abordaremos uno de los desafios mas importantes de CQRS: la consistencia eventual. Aprenderemos que significa, por que ocurre y como manejarlo de forma elegante.

Que es la Consistencia Eventual

En CQRS, cuando guardas datos en el Write Model, el Read Model no se actualiza instantaneamente. Existe un lag (retraso) entre el momento en que se escribe y el momento en que los datos estan disponibles para lectura. Este comportamiento se llama consistencia eventual.

La palabra “eventual” significa que los datos eventualmente (en algun momento) seran consistentes entre ambos modelos, pero no hay garantia de que lo sean en un instante especifico.

Esto contrasta con la consistencia inmediata (o consistencia fuerte) de una base de datos tradicional, donde inmediatamente despues de escribir puedes leer el dato actualizado.

El Problema en Detalle

Veamos el flujo temporal para entender el problema:

1. Usuario crea pedido → Write Model actualizado
2. Evento publicado → En cola
3. Proyeccion procesa → Read Model actualiza (100ms-5s despues)
4. Usuario consulta → Puede no ver su pedido aun

El escenario problematico tipico: un usuario crea un pedido, el sistema confirma “Pedido creado exitosamente”, el usuario es redirigido a “Mis Pedidos”… y su pedido no aparece en la lista. Esto ocurre porque el Read Model aun no ha procesado el evento de creacion.

El lag puede variar desde milisegundos hasta segundos, dependiendo de la carga del sistema, la velocidad del procesamiento de eventos y la complejidad de las proyecciones.

Estrategias de Manejo

Existen varias estrategias para manejar la consistencia eventual, cada una con sus ventajas y casos de uso.

1. Optimistic UI (Interfaz Optimista)

La estrategia mas comun en aplicaciones modernas. El cliente asume que la operacion tendra exito y muestra el cambio inmediatamente en la interfaz, sin esperar confirmacion del servidor.

Si la operacion falla, se revierte el cambio visual y se notifica al usuario:

// TypeScript - Cliente
async function createOrder(orderData: CreateOrderDTO) {
  // Actualiza UI optimistamente
  const tempOrder = { ...orderData, id: "temp-" + Date.now(), status: "pending" };
  addToOrderList(tempOrder);

  try {
    const result = await api.createOrder(orderData);
    // Reemplaza con datos reales
    replaceOrder(tempOrder.id, result.orderId);
  } catch (error) {
    // Revierte cambio optimista
    removeFromOrderList(tempOrder.id);
    showError("Error al crear pedido");
  }
}

2. Read Your Own Writes (Lee tus Propias Escrituras)

El Command Handler retorna suficiente informacion para mostrar la entidad inmediatamente, sin necesidad de consultar el Read Model.

Esta estrategia funciona porque el handler tiene acceso al agregado justo despues de crearlo, antes de que se procesen los eventos:

// TypeScript - Handler retorna datos mínimos
class CreateOrderHandler implements CommandHandler<CreateOrderCommand> {
  async execute(command: CreateOrderCommand): Promise<OrderCreatedResult> {
    const order = OrderAggregate.create(command);
    await this.repository.save(order);
    await this.eventBus.publish(order.pullEvents());

    // Retorna datos para UI inmediata
    return {
      orderId: order.id,
      status: order.status,
      total: order.calculateTotal(),
      createdAt: order.createdAt
    };
  }
}
// Go
type OrderCreatedResult struct {
    OrderID   string    `json:"orderId"`
    Status    string    `json:"status"`
    Total     float64   `json:"total"`
    CreatedAt time.Time `json:"createdAt"`
}

func (h *CreateOrderHandler) Execute(ctx context.Context, cmd CreateOrderCommand) (*OrderCreatedResult, error) {
    order := NewOrderAggregate(cmd)
    if err := h.repo.Save(ctx, order); err != nil {
        return nil, err
    }
    h.eventBus.Publish(ctx, order.PullEvents())

    return &OrderCreatedResult{
        OrderID:   order.ID(),
        Status:    order.Status(),
        Total:     order.CalculateTotal(),
        CreatedAt: order.CreatedAt(),
    }, nil
}
# Python
@dataclass
class OrderCreatedResult:
    order_id: str
    status: str
    total: Decimal
    created_at: datetime

class CreateOrderHandler(CommandHandler[CreateOrderCommand]):
    async def execute(self, command: CreateOrderCommand) -> OrderCreatedResult:
        order = OrderAggregate.create(command)
        await self.repository.save(order)
        await self.event_bus.publish(order.pull_events())

        return OrderCreatedResult(
            order_id=order.id,
            status=order.status,
            total=order.calculate_total(),
            created_at=order.created_at
        )

3. Polling con Timeout (Consulta Repetida)

El cliente consulta repetidamente el Read Model hasta que aparezcan los datos o se agote el tiempo de espera. Es util cuando necesitas datos completos del Read Model (no solo lo basico):

// TypeScript - Polling
async function waitForOrder(orderId: string, maxWait = 5000): Promise<OrderView> {
  const startTime = Date.now();
  const pollInterval = 200;

  while (Date.now() - startTime < maxWait) {
    const order = await api.getOrder(orderId);
    if (order) return order;
    await sleep(pollInterval);
  }

  throw new Error("Order not available yet");
}

4. Webhooks/SSE para Notificacion Push

En lugar de que el cliente pregunte repetidamente (polling), el servidor notifica activamente cuando el Read Model esta listo. Esto es mas eficiente y proporciona mejor experiencia de usuario.

SSE (Server-Sent Events) permite que el servidor envie actualizaciones al cliente sobre una conexion HTTP abierta:

// TypeScript - Server-Sent Events
class OrderProjection {
  async handle(event: OrderCreatedEvent): Promise<void> {
    const readModel = this.buildReadModel(event);
    await this.readRepository.save(readModel);

    // Notifica al cliente
    this.sseService.notify(event.customerId, {
      type: "ORDER_READY",
      orderId: event.orderId
    });
  }
}
// Go
func (p *OrderProjection) Handle(ctx context.Context, event OrderCreatedEvent) error {
    readModel := p.buildReadModel(event)
    if err := p.readRepo.Save(ctx, readModel); err != nil {
        return err
    }

    // Notifica al cliente
    p.sseService.Notify(event.CustomerID, SSEMessage{
        Type:    "ORDER_READY",
        OrderID: event.OrderID,
    })
    return nil
}
# Python
class OrderProjection:
    async def handle(self, event: OrderCreatedEvent) -> None:
        read_model = self._build_read_model(event)
        await self.read_repository.save(read_model)

        # Notifica al cliente
        await self.sse_service.notify(
            event.customer_id,
            {"type": "ORDER_READY", "orderId": event.order_id}
        )

5. Version Tokens (Tokens de Version)

Cada Read Model incluye un numero de version que incrementa con cada actualizacion. El cliente puede solicitar una version minima y detectar si los datos estan desactualizados (stale):

// TypeScript
interface OrderReadModel {
  id: string;
  version: number;  // Incrementa con cada actualización
  // ... otros campos
}

// Cliente puede solicitar versión mínima
async function getOrder(id: string, minVersion?: number): Promise<OrderReadModel> {
  const order = await api.getOrder(id);

  if (minVersion && order.version < minVersion) {
    // Datos obsoletos, reintentar o notificar
    throw new StaleDataError("Order data is stale");
  }

  return order;
}

Comunicacion con el Usuario: Ser Transparente

La mejor estrategia es ser transparente con el usuario sobre el estado del procesamiento. No escondas la consistencia eventual; disenala en la experiencia de usuario:

// TypeScript - UI Component
function OrderConfirmation({ orderId }: Props) {
  const [order, setOrder] = useState<OrderView | null>(null);
  const [loading, setLoading] = useState(true);

  useEffect(() => {
    waitForOrder(orderId, 5000)
      .then(setOrder)
      .catch(() => setOrder(null))
      .finally(() => setLoading(false));
  }, [orderId]);

  if (loading) {
    return <div>Procesando tu pedido...</div>;
  }

  return order
    ? <OrderDetails order={order} />
    : <div>Tu pedido está siendo procesado. Recibirás confirmación pronto.</div>;
}

Proximos Pasos

En el siguiente capitulo configuraremos el proyecto TypeScript base para implementar CQRS.


Glosario

Consistencia Eventual

Definicion: Modelo de consistencia donde se garantiza que, si no se realizan nuevas actualizaciones, eventualmente todas las lecturas retornaran el ultimo valor escrito. No hay garantia de cuando ocurrira esta sincronizacion.

Por que es importante: Es el compromiso que hacemos en CQRS al separar Write y Read Models. Ganamos rendimiento y escalabilidad, pero debemos manejar el periodo de inconsistencia.

Ejemplo practico: Creas un pedido a las 10:00:00.000. El Write Model lo tiene inmediatamente. El Read Model lo tiene a las 10:00:00.250. Durante esos 250ms, hay inconsistencia.


Consistencia Inmediata (o Fuerte)

Definicion: Modelo de consistencia donde cualquier lectura posterior a una escritura siempre retorna el valor actualizado. Es el comportamiento por defecto de las bases de datos relacionales tradicionales.

Por que es importante: Es lo que perdemos al adoptar CQRS con modelos separados. Es importante entender el contraste para manejar las expectativas.

Ejemplo practico: En una base de datos tradicional, si insertas un registro y luego haces SELECT, siempre veras el registro insertado.


Optimistic UI (Interfaz Optimista)

Definicion: Patron de interfaz donde los cambios se muestran inmediatamente en el cliente, asumiendo que la operacion tendra exito. Si falla, se revierten los cambios visuales.

Por que es importante: Proporciona sensacion de velocidad al usuario. Elimina la espera de confirmacion del servidor para operaciones que usualmente funcionan.

Ejemplo practico: Al dar “like” a un post, el corazon se pinta inmediatamente. Si el servidor falla, se despinta y muestra un error.


Polling

Definicion: Tecnica donde el cliente realiza consultas repetidas al servidor a intervalos regulares para verificar si hay nuevos datos disponibles.

Por que es importante: Es simple de implementar y no requiere conexiones persistentes. Util cuando el servidor no soporta notificaciones push.

Ejemplo practico: Consultar cada 200ms si el pedido ya aparece en el Read Model, hasta un maximo de 5 segundos.


Server-Sent Events (SSE)

Definicion: Tecnologia que permite al servidor enviar actualizaciones automaticas al cliente sobre una conexion HTTP de larga duracion. El cliente abre la conexion una vez y recibe eventos cuando el servidor los emite.

Por que es importante: Mas eficiente que polling para notificaciones en tiempo real. El servidor solo envia datos cuando hay algo nuevo.

Ejemplo practico: La proyeccion, al actualizar el Read Model, notifica via SSE al cliente que el pedido esta listo para consultarse.


Stale Data (Datos Obsoletos)

Definicion: Datos que ya no reflejan el estado actual del sistema. En el contexto de CQRS, el Read Model contiene datos stale durante el periodo de sincronizacion.

Por que es importante: Reconocer que los datos pueden estar obsoletos permite disenar UIs que manejen esta situacion gracefully.

Ejemplo practico: El Read Model muestra el pedido con status “pending”, pero el Write Model ya lo tiene como “confirmed”. Los datos del Read Model estan stale.


Version Token

Definicion: Numero o identificador que se incrementa cada vez que un recurso se actualiza. Permite detectar si una copia de los datos esta desactualizada.

Por que es importante: Permite al cliente solicitar datos con una version minima, garantizando que no recibe datos obsoletos de un cache o Read Model desactualizado.

Ejemplo practico: El pedido tiene version 5. El cliente, despues de modificarlo, solicita version >= 6 para asegurar que ve los cambios recientes.