← Volver al listado de tecnologías

Query Bus y Handlers

Por: SiempreListo
cqrsquery-bushandlerstypescriptgopython

Query Bus y Handlers

En este capitulo implementaremos el Query Bus, el componente que conecta las consultas con sus handlers. Veremos como estructurar queries especificas y handlers eficientes.

Que es el Query Bus

El Query Bus funciona de manera similar al Command Bus, pero para consultas. Recibe una query, encuentra el handler registrado y retorna los datos solicitados.

La diferencia principal es que las queries siempre retornan datos (a diferencia de los commands que tipicamente retornan void).

Implementacion del Query Bus

Query Bus - TypeScript

El Query Bus usa genericos para garantizar que el tipo de retorno coincida con lo que la query promete:

// src/application/queries/query-bus.ts
import { Query, QueryHandler } from "./query";

type QueryClass<T extends Query<unknown>> = new (...args: unknown[]) => T;
type ExtractResult<T> = T extends Query<infer R> ? R : never;

export class QueryBus {
  private handlers = new Map<string, QueryHandler<Query<unknown>, unknown>>();

  register<T extends Query<R>, R>(
    queryClass: QueryClass<T>,
    handler: QueryHandler<T, R>
  ): void {
    this.handlers.set(
      queryClass.name,
      handler as QueryHandler<Query<unknown>, unknown>
    );
  }

  async execute<T extends Query<R>, R = ExtractResult<T>>(query: T): Promise<R> {
    const handler = this.handlers.get(query.constructor.name);

    if (!handler) {
      throw new Error(`No handler for ${query.constructor.name}`);
    }

    return handler.execute(query) as Promise<R>;
  }
}

Query Bus - Go

// internal/application/query/bus.go
package query

import (
    "context"
    "fmt"
    "reflect"
)

type Bus struct {
    handlers map[string]any
}

func NewBus() *Bus {
    return &Bus{handlers: make(map[string]any)}
}

func Register[Q any, R any](b *Bus, handler Handler[Q, R]) {
    typeName := reflect.TypeOf((*Q)(nil)).Elem().Name()
    b.handlers[typeName] = handler
}

func Execute[Q any, R any](b *Bus, ctx context.Context, q Q) (R, error) {
    typeName := reflect.TypeOf(q).Name()
    handler, ok := b.handlers[typeName]
    if !ok {
        var zero R
        return zero, fmt.Errorf("no handler for %s", typeName)
    }

    h, ok := handler.(Handler[Q, R])
    if !ok {
        var zero R
        return zero, fmt.Errorf("handler type mismatch for %s", typeName)
    }

    return h.Execute(ctx, q)
}

type Handler[Q any, R any] interface {
    Execute(ctx context.Context, query Q) (R, error)
}

Query Bus - Python

# src/orderflow/application/queries/bus.py
from typing import Any
from .query import Query, QueryHandler

class QueryBus:
    def __init__(self) -> None:
        self._handlers: dict[type[Query[Any]], QueryHandler[Any, Any]] = {}

    def register[Q: Query[R], R](
        self,
        query_type: type[Q],
        handler: QueryHandler[Q, R]
    ) -> None:
        self._handlers[query_type] = handler

    async def execute[Q: Query[R], R](self, query: Q) -> R:
        handler = self._handlers.get(type(query))
        if handler is None:
            raise ValueError(f"No handler for {type(query).__name__}")
        return await handler.execute(query)

Query Handlers: Consultando el Read Model

Los Query Handlers son muy simples: reciben la query, consultan el repositorio de lectura y retornan el resultado.

GetOrderById Handler - TypeScript

// src/application/queries/order/get-order-by-id.handler.ts
import { QueryHandler } from "../query";
import { GetOrderByIdQuery } from "./get-order-by-id.query";
import { OrderDetailView } from "./order.views";
import { OrderReadRepository } from "@infrastructure/projections/order-read.repository";

export class GetOrderByIdHandler
  implements QueryHandler<GetOrderByIdQuery, OrderDetailView | null> {

  constructor(private readonly repository: OrderReadRepository) {}

  async execute(query: GetOrderByIdQuery): Promise<OrderDetailView | null> {
    return this.repository.findById(query.orderId);
  }
}

SearchOrders Handler - TypeScript

Para busquedas mas complejas, el handler traduce los parametros de la query a criterios de busqueda:

// src/application/queries/order/search-orders.handler.ts
import { QueryHandler } from "../query";
import { SearchOrdersQuery } from "./search-orders.query";
import { OrderListView, PaginatedResult } from "./order.views";
import { OrderReadRepository } from "@infrastructure/projections/order-read.repository";

export class SearchOrdersHandler
  implements QueryHandler<SearchOrdersQuery, PaginatedResult<OrderListView>> {

  constructor(private readonly repository: OrderReadRepository) {}

  async execute(query: SearchOrdersQuery): Promise<PaginatedResult<OrderListView>> {
    return this.repository.search({
      customerId: query.customerId,
      status: query.status,
      fromDate: query.fromDate,
      toDate: query.toDate,
      page: query.page,
      pageSize: query.pageSize
    });
  }
}

Handlers - Go

// internal/application/query/get_order_handler.go
package query

import (
    "context"

    "github.com/company/orderflow/internal/infrastructure/read"
)

type GetOrderByIDHandler struct {
    repo read.OrderRepository
}

func NewGetOrderByIDHandler(repo read.OrderRepository) *GetOrderByIDHandler {
    return &GetOrderByIDHandler{repo: repo}
}

func (h *GetOrderByIDHandler) Execute(
    ctx context.Context,
    q GetOrderByIDQuery,
) (*OrderDetailView, error) {
    return h.repo.FindByID(ctx, q.OrderID)
}

type SearchOrdersHandler struct {
    repo read.OrderRepository
}

func NewSearchOrdersHandler(repo read.OrderRepository) *SearchOrdersHandler {
    return &SearchOrdersHandler{repo: repo}
}

func (h *SearchOrdersHandler) Execute(
    ctx context.Context,
    q SearchOrdersQuery,
) (*PaginatedResult[OrderListView], error) {
    return h.repo.Search(ctx, read.SearchCriteria{
        CustomerID: q.CustomerID,
        Status:     q.Status,
        FromDate:   q.FromDate,
        ToDate:     q.ToDate,
        Page:       q.Page,
        PageSize:   q.PageSize,
    })
}

Handlers - Python

# src/orderflow/application/queries/order/get_order_handler.py
from ..query import QueryHandler
from .get_order_query import GetOrderByIdQuery
from .views import OrderDetailView
from ....infrastructure.read.order_repository import OrderReadRepository

class GetOrderByIdHandler(QueryHandler[GetOrderByIdQuery, OrderDetailView | None]):
    def __init__(self, repository: OrderReadRepository) -> None:
        self._repository = repository

    async def execute(self, query: GetOrderByIdQuery) -> OrderDetailView | None:
        return await self._repository.find_by_id(query.order_id)

class SearchOrdersHandler(QueryHandler[SearchOrdersQuery, PaginatedResult[OrderListView]]):
    def __init__(self, repository: OrderReadRepository) -> None:
        self._repository = repository

    async def execute(
        self, query: SearchOrdersQuery
    ) -> PaginatedResult[OrderListView]:
        return await self._repository.search(
            customer_id=query.customer_id,
            status=query.status,
            from_date=query.from_date,
            to_date=query.to_date,
            page=query.page,
            page_size=query.page_size
        )

Composicion y Uso en la API

Veamos como conectar todo y usar el Query Bus en endpoints REST:

// TypeScript
const queryBus = new QueryBus();
const orderReadRepo = new ElasticsearchOrderReadRepository(esClient);

queryBus.register(GetOrderByIdQuery, new GetOrderByIdHandler(orderReadRepo));
queryBus.register(SearchOrdersQuery, new SearchOrdersHandler(orderReadRepo));

// Uso en API
app.get("/orders/:id", async (req, res) => {
  const query = new GetOrderByIdQuery(req.params.id);
  const order = await queryBus.execute(query);

  if (!order) return res.status(404).json({ error: "Not found" });
  res.json(order);
});

app.get("/orders", async (req, res) => {
  const query = new SearchOrdersQuery(
    req.query.customerId,
    req.query.status,
    req.query.fromDate ? new Date(req.query.fromDate) : undefined,
    req.query.toDate ? new Date(req.query.toDate) : undefined,
    parseInt(req.query.page || "1"),
    parseInt(req.query.pageSize || "20")
  );

  const result = await queryBus.execute(query);
  res.json(result);
});

Proximos Pasos

En el siguiente capitulo implementaremos Read Models con Elasticsearch.


Glosario

Query Bus

Definicion: Componente que recibe queries y las despacha a sus handlers correspondientes, retornando los datos solicitados.

Por que es importante: Desacopla la solicitud de datos de su obtencion. Permite agregar funcionalidad transversal como cache, logging o autorizacion.

Ejemplo practico: queryBus.execute(new GetOrderByIdQuery("123")) envia la query al handler que consulta Elasticsearch y retorna el pedido.


Execute (Ejecutar)

Definicion: Metodo del Query Bus que recibe una query y retorna su resultado. Es el equivalente de dispatch para commands, pero retorna datos.

Por que es importante: Es el punto de entrada unificado para todas las consultas del sistema.

Ejemplo practico: const order = await queryBus.execute(query) ejecuta la query y obtiene el resultado tipado.


Search Criteria (Criterios de Busqueda)

Definicion: Objeto que encapsula los parametros de filtrado, ordenamiento y paginacion para una busqueda.

Por que es importante: Estandariza como se expresan las condiciones de busqueda. Facilita construir queries dinamicas.

Ejemplo practico: SearchCriteria puede incluir customerId, status, fromDate, toDate, page y pageSize, todos opcionales.


Read Repository (Repositorio de Lectura)

Definicion: Repositorio especializado en consultas al Read Model. Tiene metodos optimizados para busqueda y recuperacion de datos.

Por que es importante: Es diferente del Write Repository. Sus metodos retornan Views/DTOs optimizados para lectura, no agregados.

Ejemplo practico: OrderReadRepository tiene findById(), search() y findByCustomer(), todos retornando OrderView o listas de views.


Type Inference (Inferencia de Tipos)

Definicion: Capacidad del compilador/sistema de tipos de deducir automaticamente el tipo de una variable o retorno basandose en el contexto.

Por que es importante: El Query Bus usa genericos para que el tipo de retorno de execute() coincida con lo que la Query promete, sin casting manual.

Ejemplo practico: Si GetOrderByIdQuery implements Query<OrderDetailView | null>, entonces queryBus.execute(query) retorna Promise<OrderDetailView | null> automaticamente.