Read Models con Elasticsearch
Read Models con Elasticsearch
En este capitulo implementaremos Read Models usando Elasticsearch, una base de datos de busqueda que es ideal para consultas rapidas y complejas.
Por Que Elasticsearch para Read Models
Elasticsearch es una base de datos orientada a documentos, optimizada para busqueda. Es ideal para Read Models porque ofrece:
- Busqueda full-text: Encuentra pedidos por nombre de cliente, descripcion de producto, etc.
- Consultas rapidas: Indices invertidos permiten busquedas en milisegundos
- Agregaciones: Calcular totales, promedios, agrupaciones para dashboards
- Escalabilidad horizontal: Agrega nodos para manejar mas carga
Mientras PostgreSQL es excelente para el Write Model (transacciones, integridad), Elasticsearch es excelente para el Read Model (busquedas, agregaciones).
Indice Elasticsearch: El Mapping
En Elasticsearch, definimos un mapping que describe la estructura de los documentos. Es similar a un schema de base de datos:
{
"mappings": {
"properties": {
"id": { "type": "keyword" },
"customerId": { "type": "keyword" },
"customerName": { "type": "text", "fields": { "keyword": { "type": "keyword" } } },
"customerEmail": { "type": "keyword" },
"items": {
"type": "nested",
"properties": {
"productId": { "type": "keyword" },
"productName": { "type": "text" },
"quantity": { "type": "integer" },
"unitPrice": { "type": "float" },
"subtotal": { "type": "float" }
}
},
"total": { "type": "float" },
"status": { "type": "keyword" },
"createdAt": { "type": "date" }
}
}
}
Nota los diferentes tipos de campos:
keyword: Para busquedas exactas (IDs, status)text: Para busqueda full-text (nombres, descripciones)nested: Para arrays de objetos que necesitan ser consultados individualmentefloat/integer/date: Tipos numericos y temporales
Read Repository - TypeScript
Implementamos el repositorio que interactua con Elasticsearch:
// src/infrastructure/read/elasticsearch-order.repository.ts
import { Client } from "@elastic/elasticsearch";
import { OrderDetailView, OrderListView, PaginatedResult } from "@application/queries/order/order.views";
interface SearchCriteria {
customerId?: string;
status?: string;
fromDate?: Date;
toDate?: Date;
page: number;
pageSize: number;
}
export class ElasticsearchOrderRepository {
private readonly index = "orders";
constructor(private readonly client: Client) {}
async findById(id: string): Promise<OrderDetailView | null> {
try {
const response = await this.client.get({
index: this.index,
id
});
return response._source as OrderDetailView;
} catch (error: any) {
if (error.meta?.statusCode === 404) return null;
throw error;
}
}
async search(criteria: SearchCriteria): Promise<PaginatedResult<OrderListView>> {
const must: any[] = [];
if (criteria.customerId) {
must.push({ term: { customerId: criteria.customerId } });
}
if (criteria.status) {
must.push({ term: { status: criteria.status } });
}
if (criteria.fromDate || criteria.toDate) {
must.push({
range: {
createdAt: {
...(criteria.fromDate && { gte: criteria.fromDate.toISOString() }),
...(criteria.toDate && { lte: criteria.toDate.toISOString() })
}
}
});
}
const from = (criteria.page - 1) * criteria.pageSize;
const response = await this.client.search({
index: this.index,
body: {
query: must.length > 0 ? { bool: { must } } : { match_all: {} },
from,
size: criteria.pageSize,
sort: [{ createdAt: "desc" }]
}
});
const total = typeof response.hits.total === "number"
? response.hits.total
: response.hits.total?.value || 0;
return {
items: response.hits.hits.map(hit => hit._source as OrderListView),
total,
page: criteria.page,
pageSize: criteria.pageSize,
totalPages: Math.ceil(total / criteria.pageSize)
};
}
async save(order: OrderDetailView): Promise<void> {
await this.client.index({
index: this.index,
id: order.id,
document: order,
refresh: true
});
}
async update(id: string, partial: Partial<OrderDetailView>): Promise<void> {
await this.client.update({
index: this.index,
id,
doc: partial,
refresh: true
});
}
}
El metodo search construye dinamicamente la query de Elasticsearch basandose en los criterios proporcionados.
Read Repository - Go
// internal/infrastructure/read/elasticsearch_repository.go
package read
import (
"bytes"
"context"
"encoding/json"
"github.com/elastic/go-elasticsearch/v8"
)
type ElasticsearchOrderRepository struct {
client *elasticsearch.Client
index string
}
func NewElasticsearchOrderRepository(client *elasticsearch.Client) *ElasticsearchOrderRepository {
return &ElasticsearchOrderRepository{client: client, index: "orders"}
}
func (r *ElasticsearchOrderRepository) FindByID(
ctx context.Context,
id string,
) (*OrderDetailView, error) {
res, err := r.client.Get(r.index, id, r.client.Get.WithContext(ctx))
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode == 404 {
return nil, nil
}
var response struct {
Source OrderDetailView `json:"_source"`
}
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
return nil, err
}
return &response.Source, nil
}
func (r *ElasticsearchOrderRepository) Search(
ctx context.Context,
criteria SearchCriteria,
) (*PaginatedResult[OrderListView], error) {
query := buildQuery(criteria)
from := (criteria.Page - 1) * criteria.PageSize
body := map[string]any{
"query": query,
"from": from,
"size": criteria.PageSize,
"sort": []map[string]string{{"createdAt": "desc"}},
}
var buf bytes.Buffer
json.NewEncoder(&buf).Encode(body)
res, err := r.client.Search(
r.client.Search.WithContext(ctx),
r.client.Search.WithIndex(r.index),
r.client.Search.WithBody(&buf),
)
if err != nil {
return nil, err
}
defer res.Body.Close()
var response searchResponse
json.NewDecoder(res.Body).Decode(&response)
items := make([]OrderListView, len(response.Hits.Hits))
for i, hit := range response.Hits.Hits {
items[i] = hit.Source
}
return &PaginatedResult[OrderListView]{
Items: items,
Total: response.Hits.Total.Value,
Page: criteria.Page,
PageSize: criteria.PageSize,
TotalPages: (response.Hits.Total.Value + criteria.PageSize - 1) / criteria.PageSize,
}, nil
}
Read Repository - Python
# src/orderflow/infrastructure/read/elasticsearch_repository.py
from elasticsearch import AsyncElasticsearch, NotFoundError
from ...application.queries.order.views import OrderDetailView, OrderListView, PaginatedResult
class ElasticsearchOrderRepository:
def __init__(self, client: AsyncElasticsearch) -> None:
self._client = client
self._index = "orders"
async def find_by_id(self, id: str) -> OrderDetailView | None:
try:
response = await self._client.get(index=self._index, id=id)
return OrderDetailView(**response["_source"])
except NotFoundError:
return None
async def search(
self,
customer_id: str | None = None,
status: str | None = None,
from_date: datetime | None = None,
to_date: datetime | None = None,
page: int = 1,
page_size: int = 20
) -> PaginatedResult[OrderListView]:
must = []
if customer_id:
must.append({"term": {"customerId": customer_id}})
if status:
must.append({"term": {"status": status}})
if from_date or to_date:
range_query = {}
if from_date:
range_query["gte"] = from_date.isoformat()
if to_date:
range_query["lte"] = to_date.isoformat()
must.append({"range": {"createdAt": range_query}})
query = {"bool": {"must": must}} if must else {"match_all": {}}
from_offset = (page - 1) * page_size
response = await self._client.search(
index=self._index,
body={
"query": query,
"from": from_offset,
"size": page_size,
"sort": [{"createdAt": "desc"}]
}
)
total = response["hits"]["total"]["value"]
items = tuple(
OrderListView(**hit["_source"])
for hit in response["hits"]["hits"]
)
return PaginatedResult(
items=items,
total=total,
page=page,
page_size=page_size,
total_pages=(total + page_size - 1) // page_size
)
Proyeccion Completa: Conectando Todo
La proyeccion escucha eventos y actualiza Elasticsearch. Nota como enriquece los datos consultando otros repositorios (cliente, producto):
// src/infrastructure/projections/order.projection.ts
export class OrderProjection {
constructor(
private readonly eventBus: EventBus,
private readonly readRepo: ElasticsearchOrderRepository,
private readonly customerRepo: CustomerReadRepository,
private readonly productRepo: ProductReadRepository
) {
this.registerHandlers();
}
private registerHandlers(): void {
this.eventBus.subscribe<OrderCreatedEvent>("order.created", this.onOrderCreated.bind(this));
this.eventBus.subscribe<ItemAddedEvent>("order.item_added", this.onItemAdded.bind(this));
this.eventBus.subscribe<OrderConfirmedEvent>("order.confirmed", this.onOrderConfirmed.bind(this));
}
private async onOrderCreated(event: OrderCreatedEvent): Promise<void> {
const customer = await this.customerRepo.findById(event.customerId);
await this.readRepo.save({
id: event.aggregateId,
customerId: event.customerId,
customerName: customer?.name || "Unknown",
customerEmail: customer?.email || "",
items: [],
total: 0,
status: "pending",
createdAt: event.occurredAt.toISOString()
});
}
private async onItemAdded(event: ItemAddedEvent): Promise<void> {
const product = await this.productRepo.findById(event.productId);
const order = await this.readRepo.findById(event.aggregateId);
if (!order) return;
const subtotal = event.quantity * (product?.price || 0);
const newItem = {
productId: event.productId,
productName: product?.name || "Unknown",
quantity: event.quantity,
unitPrice: product?.price || 0,
subtotal
};
await this.readRepo.update(event.aggregateId, {
items: [...order.items, newItem],
total: order.total + subtotal
});
}
private async onOrderConfirmed(event: OrderConfirmedEvent): Promise<void> {
await this.readRepo.update(event.aggregateId, {
status: "confirmed",
total: event.total
});
}
}
Esta proyeccion ilustra un concepto importante: el Read Model puede consultar multiples fuentes para construir una vista completa. Cuando se agrega un item, busca el producto para obtener su nombre y precio.
Resumen de los Capitulos 1-12
En estos capitulos has aprendido los fundamentos de CQRS:
- Separacion de modelos: Write Model para consistencia, Read Model para consultas
- Commands: Intenciones inmutables validadas con Zod/Pydantic
- Queries: Consultas especificas que retornan Read Models
- Consistencia eventual: Estrategias para manejar el lag (Optimistic UI, Read Your Writes, SSE)
- Buses: Command Bus y Query Bus para despacho de mensajes
- Persistencia: PostgreSQL para Write Model, Elasticsearch para Read Model
- Eventos: Domain Events conectan Write Model con proyecciones
Los siguientes capitulos cubriran temas avanzados como proyecciones complejas, cache con Redis, e implementaciones en Go y Python.
Glosario
Elasticsearch
Definicion: Motor de busqueda y analisis distribuido, basado en Apache Lucene. Almacena documentos JSON y permite busquedas rapidas, agregaciones y analisis en tiempo real.
Por que es importante: Es ideal para Read Models porque esta optimizado para consultas, no para transacciones. Escala horizontalmente para manejar grandes volumenes de lectura.
Ejemplo practico: Almacenamos OrderDetailView como documentos JSON en Elasticsearch. Podemos buscar por nombre de cliente, filtrar por fecha, paginar resultados, todo en milisegundos.
Mapping
Definicion: En Elasticsearch, el mapping define como se almacenan e indexan los campos de un documento. Especifica tipos de datos y opciones de indexacion.
Por que es importante: Un buen mapping permite consultas eficientes. Definir el tipo correcto (keyword vs text) afecta como puedes buscar.
Ejemplo practico: customerName como text permite buscar “john” y encontrar “John Smith”. Como keyword, solo encuentra coincidencias exactas.
Full-text Search (Busqueda de Texto Completo)
Definicion: Tecnica de busqueda que analiza el contenido de texto, tokenizandolo en palabras individuales, para encontrar coincidencias parciales y relevantes.
Por que es importante: Permite buscar “laptop” y encontrar documentos que contienen “laptops”, “Laptop”, “LAPTOP”. Mas flexible que busqueda exacta.
Ejemplo practico: Buscar “gaming laptop” encuentra pedidos con productos que contienen “gaming” o “laptop” en su descripcion.
Document (Documento)
Definicion: En bases de datos de documentos como Elasticsearch, la unidad basica de informacion. Es un objeto JSON que representa una entidad.
Por que es importante: Los Read Models se almacenan como documentos completos, ya desnormalizados, listos para ser retornados sin procesamiento adicional.
Ejemplo practico: Un documento de pedido en Elasticsearch contiene toda la informacion necesaria para mostrar el detalle: datos del cliente, items con nombres de productos, totales precalculados.
Index (Indice)
Definicion: En Elasticsearch, un indice es una coleccion de documentos con caracteristicas similares. Es analogo a una tabla en bases de datos relacionales.
Por que es importante: Organizamos los Read Models en indices separados: orders, products, customers. Cada uno con su mapping optimizado.
Ejemplo practico: El indice orders contiene todos los documentos de pedidos. Podemos buscar solo en este indice o en multiples indices a la vez.
Nested (Anidado)
Definicion: Tipo especial de campo en Elasticsearch para arrays de objetos que necesitan ser consultados de forma independiente.
Por que es importante: Sin nested, buscar “items con productName=‘laptop’ y quantity > 5” podria dar falsos positivos mezclando campos de diferentes items.
Ejemplo practico: Los items de un pedido son nested, permitiendo consultar “pedidos donde algun item tiene productName=‘laptop’ Y ese mismo item tiene quantity > 5”.