← Volver al listado de tecnologías

Command Bus y Handlers

Por: SiempreListo
cqrscommand-bushandlerstypescriptgopython

Command Bus y Handlers

En este capitulo implementaremos el Command Bus, el componente central que conecta los comandos con sus handlers. Aprenderemos como funciona el registro de handlers y el despacho de comandos.

Que es un Command Bus

El Command Bus (bus de comandos) es un componente que actua como intermediario entre quien envia un comando y quien lo procesa. Su trabajo es simple pero crucial:

  1. Recibe un comando
  2. Encuentra el handler registrado para ese tipo de comando
  3. Invoca al handler con el comando

El Command Bus desacopla al remitente del comando de su procesador. Quien envia el comando no necesita saber que handler lo procesa ni como encontrarlo.

Implementacion del Command Bus

Veamos como implementar el Command Bus en diferentes lenguajes.

Command Bus - TypeScript

El bus mantiene un mapa de handlers indexado por el nombre de la clase del comando:

// src/application/commands/command-bus.ts
import { Command, CommandHandler } from "./command";

type CommandClass<T extends Command> = new (...args: unknown[]) => T;

export class CommandBus {
  private handlers = new Map<string, CommandHandler<Command>>();

  register<T extends Command>(
    commandClass: CommandClass<T>,
    handler: CommandHandler<T>
  ): void {
    this.handlers.set(commandClass.name, handler as CommandHandler<Command>);
  }

  async dispatch<T extends Command>(command: T): Promise<void> {
    const handler = this.handlers.get(command.constructor.name);

    if (!handler) {
      throw new Error(`No handler for ${command.constructor.name}`);
    }

    await handler.execute(command);
  }
}

Command Bus - Go

// internal/application/command/bus.go
package command

import (
    "context"
    "fmt"
    "reflect"
)

type Bus struct {
    handlers map[string]any
}

func NewBus() *Bus {
    return &Bus{handlers: make(map[string]any)}
}

func Register[T any](b *Bus, handler Handler[T]) {
    typeName := reflect.TypeOf((*T)(nil)).Elem().Name()
    b.handlers[typeName] = handler
}

func (b *Bus) Dispatch(ctx context.Context, cmd any) error {
    typeName := reflect.TypeOf(cmd).Name()
    handler, ok := b.handlers[typeName]
    if !ok {
        return fmt.Errorf("no handler for %s", typeName)
    }

    // Type assertion basada en el comando
    switch h := handler.(type) {
    case Handler[CreateOrderCommand]:
        return h.Execute(ctx, cmd.(CreateOrderCommand))
    case Handler[ConfirmOrderCommand]:
        return h.Execute(ctx, cmd.(ConfirmOrderCommand))
    default:
        return fmt.Errorf("unknown handler type")
    }
}

type Handler[T any] interface {
    Execute(ctx context.Context, cmd T) error
}

Command Bus - Python

# src/orderflow/application/commands/bus.py
from typing import Any

from .command import Command, CommandHandler

class CommandBus:
    def __init__(self) -> None:
        self._handlers: dict[type[Command], CommandHandler[Any]] = {}

    def register[T: Command](
        self,
        command_type: type[T],
        handler: CommandHandler[T]
    ) -> None:
        self._handlers[command_type] = handler

    async def dispatch[T: Command](self, command: T) -> None:
        handler = self._handlers.get(type(command))
        if handler is None:
            raise ValueError(f"No handler for {type(command).__name__}")
        await handler.execute(command)

Implementacion de Handlers

El Handler contiene la logica de procesamiento del comando. Recibe el comando, ejecuta la logica de negocio y persiste los cambios.

CreateOrder Handler - TypeScript

// src/application/commands/order/create-order.handler.ts
import { CommandHandler } from "../command";
import { CreateOrderCommand } from "./create-order.command";
import { OrderAggregate } from "@domain/order/order.aggregate";
import { OrderRepository } from "@domain/order/order.repository";
import { EventBus } from "@infrastructure/messaging/event-bus";

export class CreateOrderHandler implements CommandHandler<CreateOrderCommand> {
  constructor(
    private readonly repository: OrderRepository,
    private readonly eventBus: EventBus
  ) {}

  async execute(command: CreateOrderCommand): Promise<void> {
    const order = OrderAggregate.create({
      customerId: command.customerId,
      items: command.items,
      shippingAddress: command.shippingAddress
    });

    await this.repository.save(order);

    const events = order.pullEvents();
    for (const event of events) {
      await this.eventBus.publish(event);
    }
  }
}

El handler sigue un patron claro:

  1. Crea o recupera el agregado
  2. Ejecuta la operacion de negocio
  3. Persiste el agregado
  4. Publica los eventos generados

CreateOrder Handler - Go

// internal/application/command/create_order_handler.go
package command

import (
    "context"

    "github.com/company/orderflow/internal/domain/order"
    "github.com/company/orderflow/internal/infrastructure/messaging"
)

type CreateOrderHandler struct {
    repo     order.Repository
    eventBus messaging.EventBus
}

func NewCreateOrderHandler(
    repo order.Repository,
    eventBus messaging.EventBus,
) *CreateOrderHandler {
    return &CreateOrderHandler{repo: repo, eventBus: eventBus}
}

func (h *CreateOrderHandler) Execute(
    ctx context.Context,
    cmd CreateOrderCommand,
) error {
    aggregate, err := order.Create(
        cmd.CustomerID,
        cmd.Items,
        cmd.ShippingAddress,
    )
    if err != nil {
        return err
    }

    if err := h.repo.Save(ctx, aggregate); err != nil {
        return err
    }

    for _, event := range aggregate.PullEvents() {
        if err := h.eventBus.Publish(ctx, event); err != nil {
            return err
        }
    }
    return nil
}

CreateOrder Handler - Python

# src/orderflow/application/commands/order/create_order_handler.py
from ..command import CommandHandler
from .create_order_command import CreateOrderCommand
from ....domain.order.order_aggregate import OrderAggregate
from ....domain.order.order_repository import OrderRepository
from ....infrastructure.messaging.event_bus import EventBus

class CreateOrderHandler(CommandHandler[CreateOrderCommand]):
    def __init__(
        self,
        repository: OrderRepository,
        event_bus: EventBus
    ) -> None:
        self._repository = repository
        self._event_bus = event_bus

    async def execute(self, command: CreateOrderCommand) -> None:
        order = OrderAggregate.create(
            customer_id=command.customer_id,
            items=command.items,
            shipping_address=command.shipping_address
        )

        await self._repository.save(order)

        for event in order.pull_events():
            await self._event_bus.publish(event)

Registro y Uso: Conectando las Piezas

En el arranque de la aplicacion, registramos cada handler con el bus. Esto se hace tipicamente en la composicion o configuracion de dependencias:

// TypeScript - Composición
const commandBus = new CommandBus();
const orderRepository = new PostgresOrderRepository(pool);
const eventBus = new InMemoryEventBus();

commandBus.register(
  CreateOrderCommand,
  new CreateOrderHandler(orderRepository, eventBus)
);

// Uso
const command = new CreateOrderCommand(
  "customer-123",
  [{ productId: "prod-1", quantity: 2 }],
  { street: "123 Main St", city: "NYC", zip: "10001" }
);

await commandBus.dispatch(command);
// Go - Composición
bus := command.NewBus()
repo := postgres.NewOrderRepository(db)
eventBus := messaging.NewInMemoryEventBus()

command.Register(bus, command.NewCreateOrderHandler(repo, eventBus))

// Uso
cmd := command.NewCreateOrderCommand(
    "customer-123",
    []command.OrderItemDTO{{ProductID: "prod-1", Quantity: 2}},
    command.AddressDTO{Street: "123 Main St", City: "NYC", Zip: "10001"},
)

err := bus.Dispatch(ctx, cmd)
# Python - Composición
bus = CommandBus()
repository = PostgresOrderRepository(pool)
event_bus = InMemoryEventBus()

bus.register(CreateOrderCommand, CreateOrderHandler(repository, event_bus))

# Uso
command = CreateOrderCommand(
    customer_id="customer-123",
    items=(OrderItemDTO(product_id="prod-1", quantity=2),),
    shipping_address=AddressDTO(street="123 Main St", city="NYC", zip="10001")
)

await bus.dispatch(command)

Proximos Pasos

En el siguiente capitulo implementaremos validacion de comandos usando Zod.


Glosario

Bus (Message Bus)

Definicion: Componente que actua como intermediario para enviar mensajes (comandos, queries, eventos) a sus manejadores correspondientes. Desacopla al emisor del receptor.

Por que es importante: Permite agregar funcionalidad transversal (logging, autorizacion, metricas) sin modificar handlers. Facilita el testing al poder mockear el bus.

Ejemplo practico: El CommandBus recibe un CreateOrderCommand y lo envia al CreateOrderHandler sin que el codigo que lo invoca sepa cual handler existe.


Dispatch (Despachar)

Definicion: Accion de enviar un comando o query al bus para que sea procesado por su handler correspondiente.

Por que es importante: Es el punto de entrada para ejecutar operaciones. Todo el codigo de aplicacion “despacha” comandos en lugar de llamar handlers directamente.

Ejemplo practico: await commandBus.dispatch(new CreateOrderCommand(...)) envia el comando al bus, que lo direcciona al handler correcto.


Register (Registrar)

Definicion: Accion de asociar un tipo de comando o query con su handler en el bus. Se realiza al iniciar la aplicacion.

Por que es importante: Establece el mapeo que el bus usara para saber cual handler procesa cual comando. Sin registro, el bus no puede despachar.

Ejemplo practico: commandBus.register(CreateOrderCommand, new CreateOrderHandler(repo)) asocia el comando con su handler.


Event Bus (Bus de Eventos)

Definicion: Componente similar al Command Bus pero para eventos de dominio. Permite que multiples suscriptores reaccionen a un mismo evento.

Por que es importante: Es el mecanismo que conecta el Write Model con los Read Models. Cuando el agregado genera eventos, el Event Bus los distribuye a las proyecciones.

Ejemplo practico: Cuando se publica OrderCreatedEvent, el bus lo envia a OrderProjection que actualiza Elasticsearch, y a InventoryProjection que reserva stock.


Dependency Injection (Inyeccion de Dependencias)

Definicion: Patron donde las dependencias de un objeto (repositorios, servicios) se pasan como parametros en lugar de crearse internamente.

Por que es importante: Facilita el testing (puedes pasar mocks) y la configuracion (cambiar implementaciones sin modificar codigo).

Ejemplo practico: CreateOrderHandler recibe repository y eventBus en su constructor. En produccion recibe implementaciones reales; en tests, mocks.


Composition Root (Raiz de Composicion)

Definicion: Lugar en la aplicacion donde se crean todas las dependencias y se conectan entre si. Tipicamente al inicio de la aplicacion.

Por que es importante: Centraliza la configuracion de dependencias. Es el unico lugar que conoce las implementaciones concretas.

Ejemplo practico: En el archivo principal, creas el CommandBus, los repositorios y los handlers, y los conectas todos antes de iniciar el servidor.