Query Bus y Handlers
Query Bus y Handlers
En este capitulo implementaremos el Query Bus, el componente que conecta las consultas con sus handlers. Veremos como estructurar queries especificas y handlers eficientes.
Que es el Query Bus
El Query Bus funciona de manera similar al Command Bus, pero para consultas. Recibe una query, encuentra el handler registrado y retorna los datos solicitados.
La diferencia principal es que las queries siempre retornan datos (a diferencia de los commands que tipicamente retornan void).
Implementacion del Query Bus
Query Bus - TypeScript
El Query Bus usa genericos para garantizar que el tipo de retorno coincida con lo que la query promete:
// src/application/queries/query-bus.ts
import { Query, QueryHandler } from "./query";
type QueryClass<T extends Query<unknown>> = new (...args: unknown[]) => T;
type ExtractResult<T> = T extends Query<infer R> ? R : never;
export class QueryBus {
private handlers = new Map<string, QueryHandler<Query<unknown>, unknown>>();
register<T extends Query<R>, R>(
queryClass: QueryClass<T>,
handler: QueryHandler<T, R>
): void {
this.handlers.set(
queryClass.name,
handler as QueryHandler<Query<unknown>, unknown>
);
}
async execute<T extends Query<R>, R = ExtractResult<T>>(query: T): Promise<R> {
const handler = this.handlers.get(query.constructor.name);
if (!handler) {
throw new Error(`No handler for ${query.constructor.name}`);
}
return handler.execute(query) as Promise<R>;
}
}
Query Bus - Go
// internal/application/query/bus.go
package query
import (
"context"
"fmt"
"reflect"
)
type Bus struct {
handlers map[string]any
}
func NewBus() *Bus {
return &Bus{handlers: make(map[string]any)}
}
func Register[Q any, R any](b *Bus, handler Handler[Q, R]) {
typeName := reflect.TypeOf((*Q)(nil)).Elem().Name()
b.handlers[typeName] = handler
}
func Execute[Q any, R any](b *Bus, ctx context.Context, q Q) (R, error) {
typeName := reflect.TypeOf(q).Name()
handler, ok := b.handlers[typeName]
if !ok {
var zero R
return zero, fmt.Errorf("no handler for %s", typeName)
}
h, ok := handler.(Handler[Q, R])
if !ok {
var zero R
return zero, fmt.Errorf("handler type mismatch for %s", typeName)
}
return h.Execute(ctx, q)
}
type Handler[Q any, R any] interface {
Execute(ctx context.Context, query Q) (R, error)
}
Query Bus - Python
# src/orderflow/application/queries/bus.py
from typing import Any
from .query import Query, QueryHandler
class QueryBus:
def __init__(self) -> None:
self._handlers: dict[type[Query[Any]], QueryHandler[Any, Any]] = {}
def register[Q: Query[R], R](
self,
query_type: type[Q],
handler: QueryHandler[Q, R]
) -> None:
self._handlers[query_type] = handler
async def execute[Q: Query[R], R](self, query: Q) -> R:
handler = self._handlers.get(type(query))
if handler is None:
raise ValueError(f"No handler for {type(query).__name__}")
return await handler.execute(query)
Query Handlers: Consultando el Read Model
Los Query Handlers son muy simples: reciben la query, consultan el repositorio de lectura y retornan el resultado.
GetOrderById Handler - TypeScript
// src/application/queries/order/get-order-by-id.handler.ts
import { QueryHandler } from "../query";
import { GetOrderByIdQuery } from "./get-order-by-id.query";
import { OrderDetailView } from "./order.views";
import { OrderReadRepository } from "@infrastructure/projections/order-read.repository";
export class GetOrderByIdHandler
implements QueryHandler<GetOrderByIdQuery, OrderDetailView | null> {
constructor(private readonly repository: OrderReadRepository) {}
async execute(query: GetOrderByIdQuery): Promise<OrderDetailView | null> {
return this.repository.findById(query.orderId);
}
}
SearchOrders Handler - TypeScript
Para busquedas mas complejas, el handler traduce los parametros de la query a criterios de busqueda:
// src/application/queries/order/search-orders.handler.ts
import { QueryHandler } from "../query";
import { SearchOrdersQuery } from "./search-orders.query";
import { OrderListView, PaginatedResult } from "./order.views";
import { OrderReadRepository } from "@infrastructure/projections/order-read.repository";
export class SearchOrdersHandler
implements QueryHandler<SearchOrdersQuery, PaginatedResult<OrderListView>> {
constructor(private readonly repository: OrderReadRepository) {}
async execute(query: SearchOrdersQuery): Promise<PaginatedResult<OrderListView>> {
return this.repository.search({
customerId: query.customerId,
status: query.status,
fromDate: query.fromDate,
toDate: query.toDate,
page: query.page,
pageSize: query.pageSize
});
}
}
Handlers - Go
// internal/application/query/get_order_handler.go
package query
import (
"context"
"github.com/company/orderflow/internal/infrastructure/read"
)
type GetOrderByIDHandler struct {
repo read.OrderRepository
}
func NewGetOrderByIDHandler(repo read.OrderRepository) *GetOrderByIDHandler {
return &GetOrderByIDHandler{repo: repo}
}
func (h *GetOrderByIDHandler) Execute(
ctx context.Context,
q GetOrderByIDQuery,
) (*OrderDetailView, error) {
return h.repo.FindByID(ctx, q.OrderID)
}
type SearchOrdersHandler struct {
repo read.OrderRepository
}
func NewSearchOrdersHandler(repo read.OrderRepository) *SearchOrdersHandler {
return &SearchOrdersHandler{repo: repo}
}
func (h *SearchOrdersHandler) Execute(
ctx context.Context,
q SearchOrdersQuery,
) (*PaginatedResult[OrderListView], error) {
return h.repo.Search(ctx, read.SearchCriteria{
CustomerID: q.CustomerID,
Status: q.Status,
FromDate: q.FromDate,
ToDate: q.ToDate,
Page: q.Page,
PageSize: q.PageSize,
})
}
Handlers - Python
# src/orderflow/application/queries/order/get_order_handler.py
from ..query import QueryHandler
from .get_order_query import GetOrderByIdQuery
from .views import OrderDetailView
from ....infrastructure.read.order_repository import OrderReadRepository
class GetOrderByIdHandler(QueryHandler[GetOrderByIdQuery, OrderDetailView | None]):
def __init__(self, repository: OrderReadRepository) -> None:
self._repository = repository
async def execute(self, query: GetOrderByIdQuery) -> OrderDetailView | None:
return await self._repository.find_by_id(query.order_id)
class SearchOrdersHandler(QueryHandler[SearchOrdersQuery, PaginatedResult[OrderListView]]):
def __init__(self, repository: OrderReadRepository) -> None:
self._repository = repository
async def execute(
self, query: SearchOrdersQuery
) -> PaginatedResult[OrderListView]:
return await self._repository.search(
customer_id=query.customer_id,
status=query.status,
from_date=query.from_date,
to_date=query.to_date,
page=query.page,
page_size=query.page_size
)
Composicion y Uso en la API
Veamos como conectar todo y usar el Query Bus en endpoints REST:
// TypeScript
const queryBus = new QueryBus();
const orderReadRepo = new ElasticsearchOrderReadRepository(esClient);
queryBus.register(GetOrderByIdQuery, new GetOrderByIdHandler(orderReadRepo));
queryBus.register(SearchOrdersQuery, new SearchOrdersHandler(orderReadRepo));
// Uso en API
app.get("/orders/:id", async (req, res) => {
const query = new GetOrderByIdQuery(req.params.id);
const order = await queryBus.execute(query);
if (!order) return res.status(404).json({ error: "Not found" });
res.json(order);
});
app.get("/orders", async (req, res) => {
const query = new SearchOrdersQuery(
req.query.customerId,
req.query.status,
req.query.fromDate ? new Date(req.query.fromDate) : undefined,
req.query.toDate ? new Date(req.query.toDate) : undefined,
parseInt(req.query.page || "1"),
parseInt(req.query.pageSize || "20")
);
const result = await queryBus.execute(query);
res.json(result);
});
Proximos Pasos
En el siguiente capitulo implementaremos Read Models con Elasticsearch.
Glosario
Query Bus
Definicion: Componente que recibe queries y las despacha a sus handlers correspondientes, retornando los datos solicitados.
Por que es importante: Desacopla la solicitud de datos de su obtencion. Permite agregar funcionalidad transversal como cache, logging o autorizacion.
Ejemplo practico: queryBus.execute(new GetOrderByIdQuery("123")) envia la query al handler que consulta Elasticsearch y retorna el pedido.
Execute (Ejecutar)
Definicion: Metodo del Query Bus que recibe una query y retorna su resultado. Es el equivalente de dispatch para commands, pero retorna datos.
Por que es importante: Es el punto de entrada unificado para todas las consultas del sistema.
Ejemplo practico: const order = await queryBus.execute(query) ejecuta la query y obtiene el resultado tipado.
Search Criteria (Criterios de Busqueda)
Definicion: Objeto que encapsula los parametros de filtrado, ordenamiento y paginacion para una busqueda.
Por que es importante: Estandariza como se expresan las condiciones de busqueda. Facilita construir queries dinamicas.
Ejemplo practico: SearchCriteria puede incluir customerId, status, fromDate, toDate, page y pageSize, todos opcionales.
Read Repository (Repositorio de Lectura)
Definicion: Repositorio especializado en consultas al Read Model. Tiene metodos optimizados para busqueda y recuperacion de datos.
Por que es importante: Es diferente del Write Repository. Sus metodos retornan Views/DTOs optimizados para lectura, no agregados.
Ejemplo practico: OrderReadRepository tiene findById(), search() y findByCustomer(), todos retornando OrderView o listas de views.
Type Inference (Inferencia de Tipos)
Definicion: Capacidad del compilador/sistema de tipos de deducir automaticamente el tipo de una variable o retorno basandose en el contexto.
Por que es importante: El Query Bus usa genericos para que el tipo de retorno de execute() coincida con lo que la Query promete, sin casting manual.
Ejemplo practico: Si GetOrderByIdQuery implements Query<OrderDetailView | null>, entonces queryBus.execute(query) retorna Promise<OrderDetailView | null> automaticamente.