Persistencia del Write Model
Persistencia del Write Model
En este capitulo implementaremos la persistencia del Write Model usando PostgreSQL. Veremos como guardar y recuperar agregados, y como estructurar las tablas para soportar el modelo de dominio.
Por Que PostgreSQL para el Write Model
El Write Model requiere una base de datos transaccional que garantice:
- Atomicidad: Todas las operaciones de una transaccion se completan o ninguna
- Consistencia: La base de datos pasa de un estado valido a otro estado valido
- Aislamiento: Transacciones concurrentes no interfieren entre si
- Durabilidad: Los cambios confirmados persisten aunque falle el sistema
PostgreSQL cumple todas estas propiedades (ACID) y es una excelente opcion para Write Models.
El Agregado Order
Antes de ver la persistencia, recordemos como luce nuestro agregado. El agregado encapsula las reglas de negocio y genera eventos cuando algo importante ocurre:
// src/domain/order/order.aggregate.ts
import { AggregateRoot } from "../shared/aggregate-root";
import { OrderCreatedEvent, ItemAddedEvent, OrderConfirmedEvent } from "./order.events";
type OrderStatus = "pending" | "confirmed" | "shipped" | "cancelled";
interface OrderItem {
productId: string;
quantity: number;
unitPrice: number;
}
export class OrderAggregate extends AggregateRoot {
private _customerId: string;
private _items: OrderItem[] = [];
private _status: OrderStatus = "pending";
private _createdAt: Date;
private constructor(id: string, customerId: string) {
super(id);
this._customerId = customerId;
this._createdAt = new Date();
}
static create(params: { customerId: string; items: { productId: string; quantity: number }[] }): OrderAggregate {
const order = new OrderAggregate(crypto.randomUUID(), params.customerId);
order.addEvent(new OrderCreatedEvent(order.id, params.customerId));
return order;
}
static reconstitute(data: OrderData): OrderAggregate {
const order = new OrderAggregate(data.id, data.customerId);
order._items = data.items;
order._status = data.status;
order._createdAt = data.createdAt;
return order;
}
addItem(productId: string, quantity: number, unitPrice: number): void {
if (this._status !== "pending") {
throw new Error("Cannot modify non-pending order");
}
this._items.push({ productId, quantity, unitPrice });
this.addEvent(new ItemAddedEvent(this.id, productId, quantity));
}
confirm(): void {
if (this._items.length === 0) throw new Error("Cannot confirm empty order");
if (this._status !== "pending") throw new Error("Order already processed");
this._status = "confirmed";
this.addEvent(new OrderConfirmedEvent(this.id, this.total));
}
get customerId(): string { return this._customerId; }
get items(): readonly OrderItem[] { return this._items; }
get status(): OrderStatus { return this._status; }
get total(): number {
return this._items.reduce((sum, i) => sum + i.quantity * i.unitPrice, 0);
}
}
Nota dos patrones importantes:
create(): Factory method que crea nuevos agregados y emite eventosreconstitute(): Reconstruye el agregado desde datos de BD (sin emitir eventos)
Repository Interface: El Contrato
El repositorio es la abstraccion que oculta los detalles de persistencia. Definimos una interfaz en el dominio; la implementacion va en infraestructura:
// src/domain/order/order.repository.ts
import { OrderAggregate } from "./order.aggregate";
export interface OrderRepository {
save(order: OrderAggregate): Promise<void>;
getById(id: string): Promise<OrderAggregate | null>;
getByCustomerId(customerId: string): Promise<OrderAggregate[]>;
}
PostgreSQL Implementation - TypeScript
Ahora la implementacion concreta que usa PostgreSQL. Nota el uso de transacciones para garantizar atomicidad:
// src/infrastructure/persistence/postgres/order.repository.ts
import { Pool } from "pg";
import { OrderAggregate } from "@domain/order/order.aggregate";
import { OrderRepository } from "@domain/order/order.repository";
export class PostgresOrderRepository implements OrderRepository {
constructor(private readonly pool: Pool) {}
async save(order: OrderAggregate): Promise<void> {
const client = await this.pool.connect();
try {
await client.query("BEGIN");
await client.query(
`INSERT INTO orders (id, customer_id, status, total, created_at)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (id) DO UPDATE SET status = $3, total = $4`,
[order.id, order.customerId, order.status, order.total, new Date()]
);
await client.query("DELETE FROM order_items WHERE order_id = $1", [order.id]);
for (const item of order.items) {
await client.query(
`INSERT INTO order_items (order_id, product_id, quantity, unit_price)
VALUES ($1, $2, $3, $4)`,
[order.id, item.productId, item.quantity, item.unitPrice]
);
}
await client.query("COMMIT");
} catch (error) {
await client.query("ROLLBACK");
throw error;
} finally {
client.release();
}
}
async getById(id: string): Promise<OrderAggregate | null> {
const orderResult = await this.pool.query(
"SELECT * FROM orders WHERE id = $1", [id]
);
if (orderResult.rows.length === 0) return null;
const itemsResult = await this.pool.query(
"SELECT * FROM order_items WHERE order_id = $1", [id]
);
return OrderAggregate.reconstitute({
id: orderResult.rows[0].id,
customerId: orderResult.rows[0].customer_id,
status: orderResult.rows[0].status,
items: itemsResult.rows.map(r => ({
productId: r.product_id,
quantity: r.quantity,
unitPrice: parseFloat(r.unit_price)
})),
createdAt: orderResult.rows[0].created_at
});
}
}
PostgreSQL Implementation - Go
// internal/infrastructure/postgres/order_repository.go
package postgres
import (
"context"
"database/sql"
"github.com/company/orderflow/internal/domain/order"
)
type OrderRepository struct {
db *sql.DB
}
func NewOrderRepository(db *sql.DB) *OrderRepository {
return &OrderRepository{db: db}
}
func (r *OrderRepository) Save(ctx context.Context, o *order.Aggregate) error {
tx, err := r.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
_, err = tx.ExecContext(ctx,
`INSERT INTO orders (id, customer_id, status, total, created_at)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (id) DO UPDATE SET status = $3, total = $4`,
o.ID(), o.CustomerID(), o.Status(), o.Total(), o.CreatedAt(),
)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx,
"DELETE FROM order_items WHERE order_id = $1", o.ID())
if err != nil {
return err
}
for _, item := range o.Items() {
_, err = tx.ExecContext(ctx,
`INSERT INTO order_items (order_id, product_id, quantity, unit_price)
VALUES ($1, $2, $3, $4)`,
o.ID(), item.ProductID, item.Quantity, item.UnitPrice,
)
if err != nil {
return err
}
}
return tx.Commit()
}
PostgreSQL Implementation - Python
# src/orderflow/infrastructure/persistence/postgres/order_repository.py
from asyncpg import Pool
from ....domain.order.order_aggregate import OrderAggregate
from ....domain.order.order_repository import OrderRepository as IOrderRepository
class PostgresOrderRepository(IOrderRepository):
def __init__(self, pool: Pool) -> None:
self._pool = pool
async def save(self, order: OrderAggregate) -> None:
async with self._pool.acquire() as conn:
async with conn.transaction():
await conn.execute(
"""INSERT INTO orders (id, customer_id, status, total, created_at)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (id) DO UPDATE SET status = $3, total = $4""",
order.id, order.customer_id, order.status,
order.total, order.created_at
)
await conn.execute(
"DELETE FROM order_items WHERE order_id = $1", order.id
)
await conn.executemany(
"""INSERT INTO order_items
(order_id, product_id, quantity, unit_price)
VALUES ($1, $2, $3, $4)""",
[(order.id, i.product_id, i.quantity, i.unit_price)
for i in order.items]
)
Schema SQL: Estructura de las Tablas
Las tablas del Write Model estan normalizadas para mantener integridad:
CREATE TABLE orders (
id UUID PRIMARY KEY,
customer_id UUID NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
total DECIMAL(10, 2) NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE order_items (
id SERIAL PRIMARY KEY,
order_id UUID NOT NULL REFERENCES orders(id) ON DELETE CASCADE,
product_id UUID NOT NULL,
quantity INTEGER NOT NULL,
unit_price DECIMAL(10, 2) NOT NULL
);
CREATE INDEX idx_orders_customer ON orders(customer_id);
CREATE INDEX idx_orders_status ON orders(status);
Proximos Pasos
En el siguiente capitulo implementaremos la publicacion de eventos de dominio.
Glosario
Repositorio (Repository)
Definicion: Abstraccion que encapsula la logica de acceso a datos. Proporciona una interfaz orientada a colecciones para acceder a agregados, ocultando los detalles de persistencia.
Por que es importante: Desacopla el dominio de la infraestructura de datos. Puedes cambiar de PostgreSQL a MongoDB sin modificar el dominio.
Ejemplo practico: OrderRepository tiene metodos como save(order) y getById(id). El dominio usa estos metodos sin saber si los datos estan en SQL, NoSQL o memoria.
Transaccion (Transaction)
Definicion: Unidad de trabajo que agrupa multiples operaciones de base de datos. O todas se completan exitosamente (COMMIT) o ninguna se aplica (ROLLBACK).
Por que es importante: Garantiza que los datos permanezcan consistentes. Si falla guardar los items, tambien se revierte el guardado del pedido.
Ejemplo practico: Al guardar un pedido, guardamos la cabecera y los items en una transaccion. Si falla insertar un item, todo se revierte y la base de datos queda como estaba.
ACID
Definicion: Acronimo de Atomicidad, Consistencia, Aislamiento y Durabilidad. Son las propiedades que garantizan la fiabilidad de las transacciones en bases de datos.
Por que es importante: El Write Model requiere estas garantias para mantener la integridad de los datos de negocio.
Ejemplo practico: Gracias a ACID, si el servidor falla a mitad de una transaccion, la base de datos se recupera a un estado consistente sin datos corruptos.
Reconstitute (Reconstituir)
Definicion: Proceso de recrear un agregado a partir de datos almacenados en base de datos, sin ejecutar la logica de creacion normal ni emitir eventos.
Por que es importante: Cuando cargas un agregado existente, no quieres que emita eventos de creacion ni que valide reglas que ya se validaron cuando se creo originalmente.
Ejemplo practico: OrderAggregate.reconstitute(data) reconstruye el agregado desde la BD. No emite OrderCreatedEvent porque el pedido ya existe.
Upsert (Insert or Update)
Definicion: Operacion que inserta un registro si no existe, o lo actualiza si ya existe. Combina INSERT y UPDATE en una sola sentencia.
Por que es importante: Simplifica el codigo al no tener que verificar si el registro existe antes de guardarlo.
Ejemplo practico: ON CONFLICT (id) DO UPDATE en PostgreSQL hace upsert: si el pedido ya existe, actualiza sus campos; si no, lo inserta.
Foreign Key (Clave Foranea)
Definicion: Restriccion que vincula una columna con la clave primaria de otra tabla, garantizando que los valores referenciados existan.
Por que es importante: Mantiene la integridad referencial. No puedes tener items de un pedido que no existe.
Ejemplo practico: order_id UUID NOT NULL REFERENCES orders(id) asegura que cada item pertenece a un pedido real.