Capítulo 13: Agregados y Repositorios en Go
Capítulo 13: Agregados y Repositorios en Go
“El repositorio es el puente entre el dominio y la persistencia”
Conceptos Fundamentales
El Agregado como Máquina de Estados
Un agregado en Event Sourcing es una entidad que:
- Recibe comandos (intenciones de cambio)
- Valida reglas de negocio
- Produce eventos si el comando es válido
- Actualiza su estado interno aplicando esos eventos
El estado nunca se modifica directamente; siempre pasa por eventos. Esto garantiza que podemos reconstruir cualquier estado histórico reproduciendo los eventos.
Eventos No Confirmados (Uncommitted Events)
Cuando un agregado procesa un comando, los eventos generados se almacenan temporalmente en una lista de “eventos no confirmados”. Estos eventos:
- Ya fueron aplicados al estado interno del agregado
- Aún no han sido persistidos en el Event Store
- Se limpian después de que el repositorio los guarda exitosamente
Agregado Order en Go
// internal/domain/aggregates/order/order.go
package order
import (
"errors"
"time"
"github.com/google/uuid"
"github.com/siemprelisto/orderflow-es-go/internal/domain/events"
vo "github.com/siemprelisto/orderflow-es-go/internal/domain/valueobjects"
)
type Status string
const (
StatusDraft Status = "draft"
StatusConfirmed Status = "confirmed"
StatusPaid Status = "paid"
StatusShipped Status = "shipped"
StatusDelivered Status = "delivered"
StatusCancelled Status = "cancelled"
)
type ItemState struct {
ProductID string
ProductName string
SKU string
Quantity int
UnitPrice vo.Money
}
type Order struct {
id string
customerID string
customerEmail string
items map[string]ItemState
shippingAddress vo.Address
status Status
subtotal vo.Money
tax vo.Money
total vo.Money
version int
uncommitted []events.DomainEvent
}
// Getters
func (o *Order) ID() string { return o.id }
func (o *Order) CustomerID() string { return o.customerID }
func (o *Order) Status() Status { return o.status }
func (o *Order) Version() int { return o.version }
func (o *Order) Total() vo.Money { return o.total }
func (o *Order) UncommittedEvents() []events.DomainEvent { return o.uncommitted }
func (o *Order) ClearUncommittedEvents() {
o.uncommitted = nil
}
// Factory: crear nuevo pedido
func Create(
customerID string,
customerEmail string,
items []events.OrderItem,
shippingAddress vo.Address,
) (*Order, error) {
if len(items) == 0 {
return nil, errors.New("order must have at least one item")
}
order := &Order{
items: make(map[string]ItemState),
uncommitted: make([]events.DomainEvent, 0),
}
orderID := uuid.New().String()
// Calcular totales
var subtotal float64
for _, item := range items {
subtotal += item.UnitPrice.Amount * float64(item.Quantity)
}
tax := subtotal * 0.08
total := subtotal + tax
payload := events.OrderCreatedPayload{
CustomerID: customerID,
CustomerEmail: customerEmail,
Items: items,
ShippingAddress: shippingAddress,
Subtotal: vo.Money{Amount: subtotal, Currency: "USD"},
Tax: vo.Money{Amount: tax, Currency: "USD"},
Total: vo.Money{Amount: total, Currency: "USD"},
}
event := events.NewOrderCreated(orderID, 0, payload)
order.apply(event)
return order, nil
}
// Factory: rehidratar desde eventos
func FromEvents(storedEvents []events.DomainEvent) (*Order, error) {
if len(storedEvents) == 0 {
return nil, errors.New("cannot rehydrate order without events")
}
order := &Order{
items: make(map[string]ItemState),
uncommitted: make([]events.DomainEvent, 0),
}
for _, event := range storedEvents {
order.when(event)
}
return order, nil
}
// Commands
func (o *Order) AddItem(item events.OrderItem) error {
if err := o.assertStatus(StatusDraft); err != nil {
return err
}
if _, exists := o.items[item.ProductID]; exists {
return errors.New("product already in order")
}
newSubtotal := o.subtotal.Amount + (item.UnitPrice.Amount * float64(item.Quantity))
newTax := newSubtotal * 0.08
newTotal := newSubtotal + newTax
payload := events.OrderItemAddedPayload{
Item: item,
NewSubtotal: vo.Money{Amount: newSubtotal, Currency: "USD"},
NewTotal: vo.Money{Amount: newTotal, Currency: "USD"},
}
o.applyNew("OrderItemAdded", payload)
return nil
}
func (o *Order) Confirm() error {
if err := o.assertStatus(StatusDraft); err != nil {
return err
}
if len(o.items) == 0 {
return errors.New("cannot confirm empty order")
}
payload := events.OrderConfirmedPayload{
ConfirmedAt: time.Now().UTC(),
EstimatedDelivery: time.Now().UTC().AddDate(0, 0, 5),
}
o.applyNew("OrderConfirmed", payload)
return nil
}
func (o *Order) ReceivePayment(
paymentID string,
amount vo.Money,
method string,
transactionID string,
) error {
if err := o.assertStatus(StatusConfirmed); err != nil {
return err
}
if amount.Amount < o.total.Amount {
return errors.New("payment amount is less than order total")
}
payload := events.PaymentReceivedPayload{
PaymentID: paymentID,
Amount: amount,
Method: method,
TransactionID: transactionID,
PaidAt: time.Now().UTC(),
}
o.applyNew("PaymentReceived", payload)
return nil
}
func (o *Order) Ship(trackingNumber, carrier string) error {
if err := o.assertStatus(StatusPaid); err != nil {
return err
}
payload := events.OrderShippedPayload{
TrackingNumber: trackingNumber,
Carrier: carrier,
ShippedAt: time.Now().UTC(),
EstimatedDelivery: time.Now().UTC().AddDate(0, 0, 3),
}
o.applyNew("OrderShipped", payload)
return nil
}
func (o *Order) Cancel(reason, cancelledBy string) error {
if o.status == StatusShipped || o.status == StatusDelivered || o.status == StatusCancelled {
return errors.New("cannot cancel order in current status")
}
payload := events.OrderCancelledPayload{
Reason: reason,
CancelledBy: cancelledBy,
CancelledAt: time.Now().UTC(),
RefundRequired: o.status == StatusPaid,
}
o.applyNew("OrderCancelled", payload)
return nil
}
// Event handlers
func (o *Order) when(event events.DomainEvent) {
switch event.EventType() {
case "OrderCreated":
o.onOrderCreated(event)
case "OrderItemAdded":
o.onOrderItemAdded(event)
case "OrderConfirmed":
o.status = StatusConfirmed
case "PaymentReceived":
o.status = StatusPaid
case "OrderShipped":
o.status = StatusShipped
case "OrderDelivered":
o.status = StatusDelivered
case "OrderCancelled":
o.status = StatusCancelled
}
o.version = event.Version()
}
func (o *Order) onOrderCreated(event events.DomainEvent) {
payload := event.Payload().(events.OrderCreatedPayload)
o.id = event.AggregateID()
o.customerID = payload.CustomerID
o.customerEmail = payload.CustomerEmail
o.shippingAddress = payload.ShippingAddress
o.subtotal = payload.Subtotal
o.tax = payload.Tax
o.total = payload.Total
o.status = StatusDraft
for _, item := range payload.Items {
o.items[item.ProductID] = ItemState{
ProductID: item.ProductID,
ProductName: item.ProductName,
SKU: item.SKU,
Quantity: item.Quantity,
UnitPrice: item.UnitPrice,
}
}
}
func (o *Order) onOrderItemAdded(event events.DomainEvent) {
payload := event.Payload().(events.OrderItemAddedPayload)
o.items[payload.Item.ProductID] = ItemState{
ProductID: payload.Item.ProductID,
ProductName: payload.Item.ProductName,
SKU: payload.Item.SKU,
Quantity: payload.Item.Quantity,
UnitPrice: payload.Item.UnitPrice,
}
o.subtotal = payload.NewSubtotal
o.total = payload.NewTotal
}
func (o *Order) apply(event events.DomainEvent) {
o.uncommitted = append(o.uncommitted, event)
o.when(event)
}
func (o *Order) applyNew(eventType string, payload interface{}) {
event := events.NewEvent(
eventType,
o.id,
"Order",
o.version+1,
payload,
)
o.apply(event)
}
func (o *Order) assertStatus(expected Status) error {
if o.status != expected {
return errors.New("invalid order status for this operation")
}
return nil
}
Repositorio de Order
El repositorio actúa como intermediario entre el dominio y la infraestructura. Sus responsabilidades son:
- Save: Persiste los eventos no confirmados del agregado en el Event Store
- GetByID: Reconstruye un agregado leyendo y reproduciendo todos sus eventos
El repositorio maneja la serialización/deserialización de eventos y el cálculo de la versión esperada para control de concurrencia.
// internal/infrastructure/repository/order_repository.go
package repository
import (
"context"
"encoding/json"
"fmt"
"github.com/siemprelisto/orderflow-es-go/internal/domain/aggregates/order"
"github.com/siemprelisto/orderflow-es-go/internal/domain/events"
"github.com/siemprelisto/orderflow-es-go/internal/infrastructure/eventstore"
)
type OrderRepository struct {
store eventstore.EventStore
}
func NewOrderRepository(store eventstore.EventStore) *OrderRepository {
return &OrderRepository{store: store}
}
func (r *OrderRepository) Save(ctx context.Context, o *order.Order) error {
uncommitted := o.UncommittedEvents()
if len(uncommitted) == 0 {
return nil
}
streamID := fmt.Sprintf("order-%s", o.ID())
expectedVersion := o.Version() - len(uncommitted)
_, err := r.store.Append(ctx, streamID, uncommitted, expectedVersion)
if err != nil {
return fmt.Errorf("failed to save order: %w", err)
}
o.ClearUncommittedEvents()
return nil
}
func (r *OrderRepository) GetByID(ctx context.Context, orderID string) (*order.Order, error) {
streamID := fmt.Sprintf("order-%s", orderID)
storedEvents, err := r.store.ReadStream(ctx, streamID)
if err != nil {
return nil, fmt.Errorf("failed to read order stream: %w", err)
}
if len(storedEvents) == 0 {
return nil, fmt.Errorf("order not found: %s", orderID)
}
domainEvents, err := r.deserializeEvents(storedEvents)
if err != nil {
return nil, err
}
return order.FromEvents(domainEvents)
}
func (r *OrderRepository) deserializeEvents(stored []eventstore.StoredEvent) ([]events.DomainEvent, error) {
result := make([]events.DomainEvent, len(stored))
for i, se := range stored {
payload, err := r.deserializePayload(se.EventType, se.Data)
if err != nil {
return nil, err
}
var metadata events.EventMetadata
if err := json.Unmarshal(se.Metadata, &metadata); err != nil {
return nil, err
}
result[i] = events.BaseEvent{
ID: fmt.Sprintf("%d", se.GlobalPosition),
Type: se.EventType,
AggID: se.StreamID,
AggType: "Order",
Ver: se.StreamPosition,
Data: payload,
Meta: metadata,
}
}
return result, nil
}
func (r *OrderRepository) deserializePayload(eventType string, data []byte) (interface{}, error) {
switch eventType {
case "OrderCreated":
var payload events.OrderCreatedPayload
if err := json.Unmarshal(data, &payload); err != nil {
return nil, err
}
return payload, nil
case "OrderItemAdded":
var payload events.OrderItemAddedPayload
if err := json.Unmarshal(data, &payload); err != nil {
return nil, err
}
return payload, nil
case "OrderConfirmed":
var payload events.OrderConfirmedPayload
if err := json.Unmarshal(data, &payload); err != nil {
return nil, err
}
return payload, nil
default:
var payload map[string]interface{}
if err := json.Unmarshal(data, &payload); err != nil {
return nil, err
}
return payload, nil
}
}
Testing del Agregado
El patrón Given-When-Then es ideal para testear agregados event-sourced:
- Given: Estado inicial (eventos previos que establecen el contexto)
- When: Acción que ejecutamos (comando)
- Then: Resultado esperado (nuevos eventos emitidos o error)
Este patrón hace los tests legibles y documenta el comportamiento del agregado.
// internal/domain/aggregates/order/order_test.go
package order_test
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/siemprelisto/orderflow-es-go/internal/domain/aggregates/order"
"github.com/siemprelisto/orderflow-es-go/internal/domain/events"
vo "github.com/siemprelisto/orderflow-es-go/internal/domain/valueobjects"
)
func defaultItem() events.OrderItem {
return events.OrderItem{
ProductID: "prod-1",
ProductName: "Widget",
SKU: "WDG-001",
Quantity: 2,
UnitPrice: vo.Money{Amount: 25, Currency: "USD"},
}
}
func defaultAddress() vo.Address {
return vo.Address{
Street: "123 Main St",
City: "Springfield",
State: "IL",
ZipCode: "62701",
Country: "US",
}
}
func TestOrder_Create(t *testing.T) {
o, err := order.Create(
"customer-1",
"[email protected]",
[]events.OrderItem{defaultItem()},
defaultAddress(),
)
require.NoError(t, err)
assert.Equal(t, order.StatusDraft, o.Status())
assert.Equal(t, 54.0, o.Total().Amount) // 50 + 8% tax
assert.Len(t, o.UncommittedEvents(), 1)
}
func TestOrder_Create_EmptyItems(t *testing.T) {
_, err := order.Create(
"customer-1",
"[email protected]",
[]events.OrderItem{},
defaultAddress(),
)
assert.Error(t, err)
assert.Contains(t, err.Error(), "at least one item")
}
func TestOrder_Confirm(t *testing.T) {
o, _ := order.Create(
"customer-1",
"[email protected]",
[]events.OrderItem{defaultItem()},
defaultAddress(),
)
err := o.Confirm()
require.NoError(t, err)
assert.Equal(t, order.StatusConfirmed, o.Status())
}
func TestOrder_FullLifecycle(t *testing.T) {
// Crear
o, _ := order.Create(
"customer-1",
"[email protected]",
[]events.OrderItem{defaultItem()},
defaultAddress(),
)
assert.Equal(t, order.StatusDraft, o.Status())
// Confirmar
err := o.Confirm()
require.NoError(t, err)
assert.Equal(t, order.StatusConfirmed, o.Status())
// Pagar
err = o.ReceivePayment(
"pay-1",
vo.Money{Amount: 54, Currency: "USD"},
"credit_card",
"txn-123",
)
require.NoError(t, err)
assert.Equal(t, order.StatusPaid, o.Status())
// Enviar
err = o.Ship("TRACK123", "FedEx")
require.NoError(t, err)
assert.Equal(t, order.StatusShipped, o.Status())
}
func TestOrder_InvalidTransitions(t *testing.T) {
o, _ := order.Create(
"customer-1",
"[email protected]",
[]events.OrderItem{defaultItem()},
defaultAddress(),
)
// No puede pagar sin confirmar
err := o.ReceivePayment(
"pay-1",
vo.Money{Amount: 54, Currency: "USD"},
"credit_card",
"txn-123",
)
assert.Error(t, err)
// No puede enviar sin pagar
err = o.Ship("TRACK123", "FedEx")
assert.Error(t, err)
}
Resumen
- El agregado encapsula toda la lógica de negocio
- El repositorio persiste y rehidrata agregados
- La deserialización maneja diferentes tipos de eventos
- Los tests verifican comportamiento y transiciones
- Go ofrece tipado fuerte para eventos y payloads
Glosario
Agregado (Aggregate)
Definición: Cluster de objetos de dominio que se trata como una unidad para cambios de datos. Tiene una raíz (Aggregate Root) que es el único punto de acceso.
Por qué es importante: Define los límites de consistencia transaccional. Todos los cambios dentro de un agregado son atómicos.
Ejemplo práctico: Order es un agregado que contiene items, shippingAddress, y status. No puedes modificar un item directamente; debes hacerlo a través de métodos del Order.
Rehidratación (Rehydration)
Definición: Proceso de reconstruir el estado de un agregado reproduciendo todos sus eventos históricos en orden.
Por qué es importante: Permite recuperar el estado exacto del agregado en cualquier momento, incluso después de reiniciar la aplicación.
Ejemplo práctico: Order.FromEvents(events) toma una lista de eventos y los aplica uno a uno, reconstruyendo el Order con su estado actual.
Método When (Event Handler)
Definición: Método que actualiza el estado interno del agregado cuando se aplica un evento. Es puro: sin efectos secundarios ni validaciones.
Por qué es importante: Separa la decisión (comando) del cambio de estado (evento), permitiendo rehidratación determinista.
Ejemplo práctico: when(OrderConfirmed) simplemente cambia status = StatusConfirmed; no valida si la orden puede confirmarse (eso ya ocurrió al generar el evento).
Transición de Estado Inválida
Definición: Intento de ejecutar un comando que no es válido para el estado actual del agregado.
Por qué es importante: Protege invariantes de negocio. Un pedido enviado no puede volver a “borrador”.
Ejemplo práctico: order.AddItem() falla con error si el status no es “draft”, porque solo puedes agregar items a pedidos no confirmados.
Stream ID
Definición: Identificador único que agrupa todos los eventos de un agregado específico en el Event Store.
Por qué es importante: Permite recuperar solo los eventos relevantes para rehidratar un agregado, sin leer millones de eventos no relacionados.
Ejemplo práctico: order-550e8400-e29b-41d4-a716-446655440000 identifica todos los eventos del pedido con ese UUID.
Expected Version (Versión Esperada)
Definición: Versión del stream que esperamos al momento de escribir nuevos eventos. Si no coincide, hay un conflicto de concurrencia.
Por qué es importante: Implementa control de concurrencia optimista, detectando cuando dos procesos intentan modificar el mismo agregado simultáneamente.
Ejemplo práctico: Si cargamos un Order con versión 5 y otro proceso lo modificó (ahora es 6), nuestro append fallará porque esperábamos 5.
Type Assertion en Go
Definición: Operación que extrae el valor concreto de una interfaz, permitiendo acceder a campos específicos del tipo subyacente.
Por qué es importante: Los eventos se almacenan como interface{} para flexibilidad; necesitamos type assertion para acceder a campos específicos del payload.
Ejemplo práctico: payload := event.Payload().(OrderCreatedPayload) convierte el payload genérico al tipo específico para leer payload.CustomerID.
← Capítulo 12: MongoDB en Go | Capítulo 14: Event Sourcing en Python →