← Volver al listado de tecnologías

Read Models con Elasticsearch

Por: SiempreListo
cqrselasticsearchread-modelstypescriptgopython

Read Models con Elasticsearch

En este capitulo implementaremos Read Models usando Elasticsearch, una base de datos de busqueda que es ideal para consultas rapidas y complejas.

Por Que Elasticsearch para Read Models

Elasticsearch es una base de datos orientada a documentos, optimizada para busqueda. Es ideal para Read Models porque ofrece:

Mientras PostgreSQL es excelente para el Write Model (transacciones, integridad), Elasticsearch es excelente para el Read Model (busquedas, agregaciones).

Indice Elasticsearch: El Mapping

En Elasticsearch, definimos un mapping que describe la estructura de los documentos. Es similar a un schema de base de datos:

{
  "mappings": {
    "properties": {
      "id": { "type": "keyword" },
      "customerId": { "type": "keyword" },
      "customerName": { "type": "text", "fields": { "keyword": { "type": "keyword" } } },
      "customerEmail": { "type": "keyword" },
      "items": {
        "type": "nested",
        "properties": {
          "productId": { "type": "keyword" },
          "productName": { "type": "text" },
          "quantity": { "type": "integer" },
          "unitPrice": { "type": "float" },
          "subtotal": { "type": "float" }
        }
      },
      "total": { "type": "float" },
      "status": { "type": "keyword" },
      "createdAt": { "type": "date" }
    }
  }
}

Nota los diferentes tipos de campos:

Read Repository - TypeScript

Implementamos el repositorio que interactua con Elasticsearch:

// src/infrastructure/read/elasticsearch-order.repository.ts
import { Client } from "@elastic/elasticsearch";
import { OrderDetailView, OrderListView, PaginatedResult } from "@application/queries/order/order.views";

interface SearchCriteria {
  customerId?: string;
  status?: string;
  fromDate?: Date;
  toDate?: Date;
  page: number;
  pageSize: number;
}

export class ElasticsearchOrderRepository {
  private readonly index = "orders";

  constructor(private readonly client: Client) {}

  async findById(id: string): Promise<OrderDetailView | null> {
    try {
      const response = await this.client.get({
        index: this.index,
        id
      });
      return response._source as OrderDetailView;
    } catch (error: any) {
      if (error.meta?.statusCode === 404) return null;
      throw error;
    }
  }

  async search(criteria: SearchCriteria): Promise<PaginatedResult<OrderListView>> {
    const must: any[] = [];

    if (criteria.customerId) {
      must.push({ term: { customerId: criteria.customerId } });
    }
    if (criteria.status) {
      must.push({ term: { status: criteria.status } });
    }
    if (criteria.fromDate || criteria.toDate) {
      must.push({
        range: {
          createdAt: {
            ...(criteria.fromDate && { gte: criteria.fromDate.toISOString() }),
            ...(criteria.toDate && { lte: criteria.toDate.toISOString() })
          }
        }
      });
    }

    const from = (criteria.page - 1) * criteria.pageSize;

    const response = await this.client.search({
      index: this.index,
      body: {
        query: must.length > 0 ? { bool: { must } } : { match_all: {} },
        from,
        size: criteria.pageSize,
        sort: [{ createdAt: "desc" }]
      }
    });

    const total = typeof response.hits.total === "number"
      ? response.hits.total
      : response.hits.total?.value || 0;

    return {
      items: response.hits.hits.map(hit => hit._source as OrderListView),
      total,
      page: criteria.page,
      pageSize: criteria.pageSize,
      totalPages: Math.ceil(total / criteria.pageSize)
    };
  }

  async save(order: OrderDetailView): Promise<void> {
    await this.client.index({
      index: this.index,
      id: order.id,
      document: order,
      refresh: true
    });
  }

  async update(id: string, partial: Partial<OrderDetailView>): Promise<void> {
    await this.client.update({
      index: this.index,
      id,
      doc: partial,
      refresh: true
    });
  }
}

El metodo search construye dinamicamente la query de Elasticsearch basandose en los criterios proporcionados.

Read Repository - Go

// internal/infrastructure/read/elasticsearch_repository.go
package read

import (
    "bytes"
    "context"
    "encoding/json"

    "github.com/elastic/go-elasticsearch/v8"
)

type ElasticsearchOrderRepository struct {
    client *elasticsearch.Client
    index  string
}

func NewElasticsearchOrderRepository(client *elasticsearch.Client) *ElasticsearchOrderRepository {
    return &ElasticsearchOrderRepository{client: client, index: "orders"}
}

func (r *ElasticsearchOrderRepository) FindByID(
    ctx context.Context,
    id string,
) (*OrderDetailView, error) {
    res, err := r.client.Get(r.index, id, r.client.Get.WithContext(ctx))
    if err != nil {
        return nil, err
    }
    defer res.Body.Close()

    if res.StatusCode == 404 {
        return nil, nil
    }

    var response struct {
        Source OrderDetailView `json:"_source"`
    }
    if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
        return nil, err
    }
    return &response.Source, nil
}

func (r *ElasticsearchOrderRepository) Search(
    ctx context.Context,
    criteria SearchCriteria,
) (*PaginatedResult[OrderListView], error) {
    query := buildQuery(criteria)
    from := (criteria.Page - 1) * criteria.PageSize

    body := map[string]any{
        "query": query,
        "from":  from,
        "size":  criteria.PageSize,
        "sort":  []map[string]string{{"createdAt": "desc"}},
    }

    var buf bytes.Buffer
    json.NewEncoder(&buf).Encode(body)

    res, err := r.client.Search(
        r.client.Search.WithContext(ctx),
        r.client.Search.WithIndex(r.index),
        r.client.Search.WithBody(&buf),
    )
    if err != nil {
        return nil, err
    }
    defer res.Body.Close()

    var response searchResponse
    json.NewDecoder(res.Body).Decode(&response)

    items := make([]OrderListView, len(response.Hits.Hits))
    for i, hit := range response.Hits.Hits {
        items[i] = hit.Source
    }

    return &PaginatedResult[OrderListView]{
        Items:      items,
        Total:      response.Hits.Total.Value,
        Page:       criteria.Page,
        PageSize:   criteria.PageSize,
        TotalPages: (response.Hits.Total.Value + criteria.PageSize - 1) / criteria.PageSize,
    }, nil
}

Read Repository - Python

# src/orderflow/infrastructure/read/elasticsearch_repository.py
from elasticsearch import AsyncElasticsearch, NotFoundError
from ...application.queries.order.views import OrderDetailView, OrderListView, PaginatedResult

class ElasticsearchOrderRepository:
    def __init__(self, client: AsyncElasticsearch) -> None:
        self._client = client
        self._index = "orders"

    async def find_by_id(self, id: str) -> OrderDetailView | None:
        try:
            response = await self._client.get(index=self._index, id=id)
            return OrderDetailView(**response["_source"])
        except NotFoundError:
            return None

    async def search(
        self,
        customer_id: str | None = None,
        status: str | None = None,
        from_date: datetime | None = None,
        to_date: datetime | None = None,
        page: int = 1,
        page_size: int = 20
    ) -> PaginatedResult[OrderListView]:
        must = []

        if customer_id:
            must.append({"term": {"customerId": customer_id}})
        if status:
            must.append({"term": {"status": status}})
        if from_date or to_date:
            range_query = {}
            if from_date:
                range_query["gte"] = from_date.isoformat()
            if to_date:
                range_query["lte"] = to_date.isoformat()
            must.append({"range": {"createdAt": range_query}})

        query = {"bool": {"must": must}} if must else {"match_all": {}}
        from_offset = (page - 1) * page_size

        response = await self._client.search(
            index=self._index,
            body={
                "query": query,
                "from": from_offset,
                "size": page_size,
                "sort": [{"createdAt": "desc"}]
            }
        )

        total = response["hits"]["total"]["value"]
        items = tuple(
            OrderListView(**hit["_source"])
            for hit in response["hits"]["hits"]
        )

        return PaginatedResult(
            items=items,
            total=total,
            page=page,
            page_size=page_size,
            total_pages=(total + page_size - 1) // page_size
        )

Proyeccion Completa: Conectando Todo

La proyeccion escucha eventos y actualiza Elasticsearch. Nota como enriquece los datos consultando otros repositorios (cliente, producto):

// src/infrastructure/projections/order.projection.ts
export class OrderProjection {
  constructor(
    private readonly eventBus: EventBus,
    private readonly readRepo: ElasticsearchOrderRepository,
    private readonly customerRepo: CustomerReadRepository,
    private readonly productRepo: ProductReadRepository
  ) {
    this.registerHandlers();
  }

  private registerHandlers(): void {
    this.eventBus.subscribe<OrderCreatedEvent>("order.created", this.onOrderCreated.bind(this));
    this.eventBus.subscribe<ItemAddedEvent>("order.item_added", this.onItemAdded.bind(this));
    this.eventBus.subscribe<OrderConfirmedEvent>("order.confirmed", this.onOrderConfirmed.bind(this));
  }

  private async onOrderCreated(event: OrderCreatedEvent): Promise<void> {
    const customer = await this.customerRepo.findById(event.customerId);

    await this.readRepo.save({
      id: event.aggregateId,
      customerId: event.customerId,
      customerName: customer?.name || "Unknown",
      customerEmail: customer?.email || "",
      items: [],
      total: 0,
      status: "pending",
      createdAt: event.occurredAt.toISOString()
    });
  }

  private async onItemAdded(event: ItemAddedEvent): Promise<void> {
    const product = await this.productRepo.findById(event.productId);
    const order = await this.readRepo.findById(event.aggregateId);
    if (!order) return;

    const subtotal = event.quantity * (product?.price || 0);
    const newItem = {
      productId: event.productId,
      productName: product?.name || "Unknown",
      quantity: event.quantity,
      unitPrice: product?.price || 0,
      subtotal
    };

    await this.readRepo.update(event.aggregateId, {
      items: [...order.items, newItem],
      total: order.total + subtotal
    });
  }

  private async onOrderConfirmed(event: OrderConfirmedEvent): Promise<void> {
    await this.readRepo.update(event.aggregateId, {
      status: "confirmed",
      total: event.total
    });
  }
}

Esta proyeccion ilustra un concepto importante: el Read Model puede consultar multiples fuentes para construir una vista completa. Cuando se agrega un item, busca el producto para obtener su nombre y precio.

Resumen de los Capitulos 1-12

En estos capitulos has aprendido los fundamentos de CQRS:

  1. Separacion de modelos: Write Model para consistencia, Read Model para consultas
  2. Commands: Intenciones inmutables validadas con Zod/Pydantic
  3. Queries: Consultas especificas que retornan Read Models
  4. Consistencia eventual: Estrategias para manejar el lag (Optimistic UI, Read Your Writes, SSE)
  5. Buses: Command Bus y Query Bus para despacho de mensajes
  6. Persistencia: PostgreSQL para Write Model, Elasticsearch para Read Model
  7. Eventos: Domain Events conectan Write Model con proyecciones

Los siguientes capitulos cubriran temas avanzados como proyecciones complejas, cache con Redis, e implementaciones en Go y Python.


Glosario

Elasticsearch

Definicion: Motor de busqueda y analisis distribuido, basado en Apache Lucene. Almacena documentos JSON y permite busquedas rapidas, agregaciones y analisis en tiempo real.

Por que es importante: Es ideal para Read Models porque esta optimizado para consultas, no para transacciones. Escala horizontalmente para manejar grandes volumenes de lectura.

Ejemplo practico: Almacenamos OrderDetailView como documentos JSON en Elasticsearch. Podemos buscar por nombre de cliente, filtrar por fecha, paginar resultados, todo en milisegundos.


Mapping

Definicion: En Elasticsearch, el mapping define como se almacenan e indexan los campos de un documento. Especifica tipos de datos y opciones de indexacion.

Por que es importante: Un buen mapping permite consultas eficientes. Definir el tipo correcto (keyword vs text) afecta como puedes buscar.

Ejemplo practico: customerName como text permite buscar “john” y encontrar “John Smith”. Como keyword, solo encuentra coincidencias exactas.


Full-text Search (Busqueda de Texto Completo)

Definicion: Tecnica de busqueda que analiza el contenido de texto, tokenizandolo en palabras individuales, para encontrar coincidencias parciales y relevantes.

Por que es importante: Permite buscar “laptop” y encontrar documentos que contienen “laptops”, “Laptop”, “LAPTOP”. Mas flexible que busqueda exacta.

Ejemplo practico: Buscar “gaming laptop” encuentra pedidos con productos que contienen “gaming” o “laptop” en su descripcion.


Document (Documento)

Definicion: En bases de datos de documentos como Elasticsearch, la unidad basica de informacion. Es un objeto JSON que representa una entidad.

Por que es importante: Los Read Models se almacenan como documentos completos, ya desnormalizados, listos para ser retornados sin procesamiento adicional.

Ejemplo practico: Un documento de pedido en Elasticsearch contiene toda la informacion necesaria para mostrar el detalle: datos del cliente, items con nombres de productos, totales precalculados.


Index (Indice)

Definicion: En Elasticsearch, un indice es una coleccion de documentos con caracteristicas similares. Es analogo a una tabla en bases de datos relacionales.

Por que es importante: Organizamos los Read Models en indices separados: orders, products, customers. Cada uno con su mapping optimizado.

Ejemplo practico: El indice orders contiene todos los documentos de pedidos. Podemos buscar solo en este indice o en multiples indices a la vez.


Nested (Anidado)

Definicion: Tipo especial de campo en Elasticsearch para arrays de objetos que necesitan ser consultados de forma independiente.

Por que es importante: Sin nested, buscar “items con productName=‘laptop’ y quantity > 5” podria dar falsos positivos mezclando campos de diferentes items.

Ejemplo practico: Los items de un pedido son nested, permitiendo consultar “pedidos donde algun item tiene productName=‘laptop’ Y ese mismo item tiene quantity > 5”.