← Volver al listado de tecnologías

Persistencia del Write Model

Por: SiempreListo
cqrswrite-modelpersistenciapostgresqltypescriptgopython

Persistencia del Write Model

En este capitulo implementaremos la persistencia del Write Model usando PostgreSQL. Veremos como guardar y recuperar agregados, y como estructurar las tablas para soportar el modelo de dominio.

Por Que PostgreSQL para el Write Model

El Write Model requiere una base de datos transaccional que garantice:

PostgreSQL cumple todas estas propiedades (ACID) y es una excelente opcion para Write Models.

El Agregado Order

Antes de ver la persistencia, recordemos como luce nuestro agregado. El agregado encapsula las reglas de negocio y genera eventos cuando algo importante ocurre:

// src/domain/order/order.aggregate.ts
import { AggregateRoot } from "../shared/aggregate-root";
import { OrderCreatedEvent, ItemAddedEvent, OrderConfirmedEvent } from "./order.events";

type OrderStatus = "pending" | "confirmed" | "shipped" | "cancelled";

interface OrderItem {
  productId: string;
  quantity: number;
  unitPrice: number;
}

export class OrderAggregate extends AggregateRoot {
  private _customerId: string;
  private _items: OrderItem[] = [];
  private _status: OrderStatus = "pending";
  private _createdAt: Date;

  private constructor(id: string, customerId: string) {
    super(id);
    this._customerId = customerId;
    this._createdAt = new Date();
  }

  static create(params: { customerId: string; items: { productId: string; quantity: number }[] }): OrderAggregate {
    const order = new OrderAggregate(crypto.randomUUID(), params.customerId);
    order.addEvent(new OrderCreatedEvent(order.id, params.customerId));
    return order;
  }

  static reconstitute(data: OrderData): OrderAggregate {
    const order = new OrderAggregate(data.id, data.customerId);
    order._items = data.items;
    order._status = data.status;
    order._createdAt = data.createdAt;
    return order;
  }

  addItem(productId: string, quantity: number, unitPrice: number): void {
    if (this._status !== "pending") {
      throw new Error("Cannot modify non-pending order");
    }
    this._items.push({ productId, quantity, unitPrice });
    this.addEvent(new ItemAddedEvent(this.id, productId, quantity));
  }

  confirm(): void {
    if (this._items.length === 0) throw new Error("Cannot confirm empty order");
    if (this._status !== "pending") throw new Error("Order already processed");
    this._status = "confirmed";
    this.addEvent(new OrderConfirmedEvent(this.id, this.total));
  }

  get customerId(): string { return this._customerId; }
  get items(): readonly OrderItem[] { return this._items; }
  get status(): OrderStatus { return this._status; }
  get total(): number {
    return this._items.reduce((sum, i) => sum + i.quantity * i.unitPrice, 0);
  }
}

Nota dos patrones importantes:

Repository Interface: El Contrato

El repositorio es la abstraccion que oculta los detalles de persistencia. Definimos una interfaz en el dominio; la implementacion va en infraestructura:

// src/domain/order/order.repository.ts
import { OrderAggregate } from "./order.aggregate";

export interface OrderRepository {
  save(order: OrderAggregate): Promise<void>;
  getById(id: string): Promise<OrderAggregate | null>;
  getByCustomerId(customerId: string): Promise<OrderAggregate[]>;
}

PostgreSQL Implementation - TypeScript

Ahora la implementacion concreta que usa PostgreSQL. Nota el uso de transacciones para garantizar atomicidad:

// src/infrastructure/persistence/postgres/order.repository.ts
import { Pool } from "pg";
import { OrderAggregate } from "@domain/order/order.aggregate";
import { OrderRepository } from "@domain/order/order.repository";

export class PostgresOrderRepository implements OrderRepository {
  constructor(private readonly pool: Pool) {}

  async save(order: OrderAggregate): Promise<void> {
    const client = await this.pool.connect();
    try {
      await client.query("BEGIN");

      await client.query(
        `INSERT INTO orders (id, customer_id, status, total, created_at)
         VALUES ($1, $2, $3, $4, $5)
         ON CONFLICT (id) DO UPDATE SET status = $3, total = $4`,
        [order.id, order.customerId, order.status, order.total, new Date()]
      );

      await client.query("DELETE FROM order_items WHERE order_id = $1", [order.id]);

      for (const item of order.items) {
        await client.query(
          `INSERT INTO order_items (order_id, product_id, quantity, unit_price)
           VALUES ($1, $2, $3, $4)`,
          [order.id, item.productId, item.quantity, item.unitPrice]
        );
      }

      await client.query("COMMIT");
    } catch (error) {
      await client.query("ROLLBACK");
      throw error;
    } finally {
      client.release();
    }
  }

  async getById(id: string): Promise<OrderAggregate | null> {
    const orderResult = await this.pool.query(
      "SELECT * FROM orders WHERE id = $1", [id]
    );
    if (orderResult.rows.length === 0) return null;

    const itemsResult = await this.pool.query(
      "SELECT * FROM order_items WHERE order_id = $1", [id]
    );

    return OrderAggregate.reconstitute({
      id: orderResult.rows[0].id,
      customerId: orderResult.rows[0].customer_id,
      status: orderResult.rows[0].status,
      items: itemsResult.rows.map(r => ({
        productId: r.product_id,
        quantity: r.quantity,
        unitPrice: parseFloat(r.unit_price)
      })),
      createdAt: orderResult.rows[0].created_at
    });
  }
}

PostgreSQL Implementation - Go

// internal/infrastructure/postgres/order_repository.go
package postgres

import (
    "context"
    "database/sql"

    "github.com/company/orderflow/internal/domain/order"
)

type OrderRepository struct {
    db *sql.DB
}

func NewOrderRepository(db *sql.DB) *OrderRepository {
    return &OrderRepository{db: db}
}

func (r *OrderRepository) Save(ctx context.Context, o *order.Aggregate) error {
    tx, err := r.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    _, err = tx.ExecContext(ctx,
        `INSERT INTO orders (id, customer_id, status, total, created_at)
         VALUES ($1, $2, $3, $4, $5)
         ON CONFLICT (id) DO UPDATE SET status = $3, total = $4`,
        o.ID(), o.CustomerID(), o.Status(), o.Total(), o.CreatedAt(),
    )
    if err != nil {
        return err
    }

    _, err = tx.ExecContext(ctx,
        "DELETE FROM order_items WHERE order_id = $1", o.ID())
    if err != nil {
        return err
    }

    for _, item := range o.Items() {
        _, err = tx.ExecContext(ctx,
            `INSERT INTO order_items (order_id, product_id, quantity, unit_price)
             VALUES ($1, $2, $3, $4)`,
            o.ID(), item.ProductID, item.Quantity, item.UnitPrice,
        )
        if err != nil {
            return err
        }
    }

    return tx.Commit()
}

PostgreSQL Implementation - Python

# src/orderflow/infrastructure/persistence/postgres/order_repository.py
from asyncpg import Pool
from ....domain.order.order_aggregate import OrderAggregate
from ....domain.order.order_repository import OrderRepository as IOrderRepository

class PostgresOrderRepository(IOrderRepository):
    def __init__(self, pool: Pool) -> None:
        self._pool = pool

    async def save(self, order: OrderAggregate) -> None:
        async with self._pool.acquire() as conn:
            async with conn.transaction():
                await conn.execute(
                    """INSERT INTO orders (id, customer_id, status, total, created_at)
                       VALUES ($1, $2, $3, $4, $5)
                       ON CONFLICT (id) DO UPDATE SET status = $3, total = $4""",
                    order.id, order.customer_id, order.status,
                    order.total, order.created_at
                )

                await conn.execute(
                    "DELETE FROM order_items WHERE order_id = $1", order.id
                )

                await conn.executemany(
                    """INSERT INTO order_items
                       (order_id, product_id, quantity, unit_price)
                       VALUES ($1, $2, $3, $4)""",
                    [(order.id, i.product_id, i.quantity, i.unit_price)
                     for i in order.items]
                )

Schema SQL: Estructura de las Tablas

Las tablas del Write Model estan normalizadas para mantener integridad:

CREATE TABLE orders (
    id UUID PRIMARY KEY,
    customer_id UUID NOT NULL,
    status VARCHAR(20) NOT NULL DEFAULT 'pending',
    total DECIMAL(10, 2) NOT NULL DEFAULT 0,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE order_items (
    id SERIAL PRIMARY KEY,
    order_id UUID NOT NULL REFERENCES orders(id) ON DELETE CASCADE,
    product_id UUID NOT NULL,
    quantity INTEGER NOT NULL,
    unit_price DECIMAL(10, 2) NOT NULL
);

CREATE INDEX idx_orders_customer ON orders(customer_id);
CREATE INDEX idx_orders_status ON orders(status);

Proximos Pasos

En el siguiente capitulo implementaremos la publicacion de eventos de dominio.


Glosario

Repositorio (Repository)

Definicion: Abstraccion que encapsula la logica de acceso a datos. Proporciona una interfaz orientada a colecciones para acceder a agregados, ocultando los detalles de persistencia.

Por que es importante: Desacopla el dominio de la infraestructura de datos. Puedes cambiar de PostgreSQL a MongoDB sin modificar el dominio.

Ejemplo practico: OrderRepository tiene metodos como save(order) y getById(id). El dominio usa estos metodos sin saber si los datos estan en SQL, NoSQL o memoria.


Transaccion (Transaction)

Definicion: Unidad de trabajo que agrupa multiples operaciones de base de datos. O todas se completan exitosamente (COMMIT) o ninguna se aplica (ROLLBACK).

Por que es importante: Garantiza que los datos permanezcan consistentes. Si falla guardar los items, tambien se revierte el guardado del pedido.

Ejemplo practico: Al guardar un pedido, guardamos la cabecera y los items en una transaccion. Si falla insertar un item, todo se revierte y la base de datos queda como estaba.


ACID

Definicion: Acronimo de Atomicidad, Consistencia, Aislamiento y Durabilidad. Son las propiedades que garantizan la fiabilidad de las transacciones en bases de datos.

Por que es importante: El Write Model requiere estas garantias para mantener la integridad de los datos de negocio.

Ejemplo practico: Gracias a ACID, si el servidor falla a mitad de una transaccion, la base de datos se recupera a un estado consistente sin datos corruptos.


Reconstitute (Reconstituir)

Definicion: Proceso de recrear un agregado a partir de datos almacenados en base de datos, sin ejecutar la logica de creacion normal ni emitir eventos.

Por que es importante: Cuando cargas un agregado existente, no quieres que emita eventos de creacion ni que valide reglas que ya se validaron cuando se creo originalmente.

Ejemplo practico: OrderAggregate.reconstitute(data) reconstruye el agregado desde la BD. No emite OrderCreatedEvent porque el pedido ya existe.


Upsert (Insert or Update)

Definicion: Operacion que inserta un registro si no existe, o lo actualiza si ya existe. Combina INSERT y UPDATE en una sola sentencia.

Por que es importante: Simplifica el codigo al no tener que verificar si el registro existe antes de guardarlo.

Ejemplo practico: ON CONFLICT (id) DO UPDATE en PostgreSQL hace upsert: si el pedido ya existe, actualiza sus campos; si no, lo inserta.


Foreign Key (Clave Foranea)

Definicion: Restriccion que vincula una columna con la clave primaria de otra tabla, garantizando que los valores referenciados existan.

Por que es importante: Mantiene la integridad referencial. No puedes tener items de un pedido que no existe.

Ejemplo practico: order_id UUID NOT NULL REFERENCES orders(id) asegura que cada item pertenece a un pedido real.