← Volver al listado de tecnologías

Pub/Sub en Valkey

Por: Artiko
valkeypubsubmensajeriatiempo-real

Pub/Sub (Publish/Subscribe)

Conceptos básicos

Pub/Sub permite comunicación asíncrona entre procesos:

Publisher → Canal → Subscriber 1
                  → Subscriber 2
                  → Subscriber N

Suscripción a canales

Terminal del suscriptor

# Suscribirse a un canal
SUBSCRIBE noticias
# Reading messages... (press Ctrl-C to quit)
# 1) "subscribe"
# 2) "noticias"
# 3) (integer) 1

# Suscribirse a múltiples canales
SUBSCRIBE noticias deportes clima

# Suscribirse por patrón
PSUBSCRIBE news:*
PSUBSCRIBE user:*:events

Terminal del publisher

# Publicar mensaje
PUBLISH noticias "Nueva noticia importante"
# (integer) 2  <- número de suscriptores que recibieron

PUBLISH news:tech "Nuevo lanzamiento de software"

Patrones avanzados

Wildcards con PSUBSCRIBE

# Patrones soportados
PSUBSCRIBE h?llo      # hello, hallo, hxllo
PSUBSCRIBE h*llo      # hllo, heeeello
PSUBSCRIBE h[ae]llo   # hello, hallo

# Ejemplo práctico
PSUBSCRIBE chat:room:*          # Todos los rooms
PSUBSCRIBE events:user:*:login  # Logins de usuarios

Desuscribirse

# Desuscribirse de canal específico
UNSUBSCRIBE noticias

# Desuscribirse de patrón
PUNSUBSCRIBE news:*

# Desuscribirse de todo
UNSUBSCRIBE
PUNSUBSCRIBE

Información de Pub/Sub

# Canales activos
PUBSUB CHANNELS
PUBSUB CHANNELS news:*

# Número de suscriptores por canal
PUBSUB NUMSUB noticias deportes

# Número de suscripciones por patrón
PUBSUB NUMPAT

Implementación en Python

import valkey
import threading

# Conexión
r = valkey.Valkey(host='localhost', port=6379, decode_responses=True)

# Suscriptor
def subscriber():
    pubsub = r.pubsub()
    pubsub.subscribe('noticias')

    for message in pubsub.listen():
        if message['type'] == 'message':
            print(f"Recibido: {message['data']}")

# Iniciar en thread separado
thread = threading.Thread(target=subscriber)
thread.daemon = True
thread.start()

# Publisher
r.publish('noticias', 'Hola desde Python!')

Con patrón

def subscriber_pattern():
    pubsub = r.pubsub()
    pubsub.psubscribe('events:*')

    for message in pubsub.listen():
        if message['type'] == 'pmessage':
            print(f"Canal: {message['channel']}")
            print(f"Mensaje: {message['data']}")

Implementación en Node.js

import { createClient } from 'valkey';

const subscriber = createClient();
const publisher = createClient();

await subscriber.connect();
await publisher.connect();

// Suscribirse
await subscriber.subscribe('noticias', (message) => {
    console.log('Recibido:', message);
});

// Con patrón
await subscriber.pSubscribe('events:*', (message, channel) => {
    console.log(`Canal ${channel}: ${message}`);
});

// Publicar
await publisher.publish('noticias', 'Mensaje desde Node.js');

Casos de uso

Chat en tiempo real

# Estructura de canales
chat:room:general
chat:room:dev
chat:user:123:direct

# Usuario se une a room
SUBSCRIBE chat:room:general

# Enviar mensaje
PUBLISH chat:room:general '{"user":"juan","msg":"Hola!"}'

Notificaciones

# Servicio de notificaciones
def enviar_notificacion(user_id, mensaje):
    canal = f"notifications:user:{user_id}"
    r.publish(canal, json.dumps({
        'tipo': 'notificacion',
        'mensaje': mensaje,
        'timestamp': time.time()
    }))

# Cliente suscrito
pubsub.psubscribe('notifications:user:42')

Invalidación de cache

# Cuando se actualiza un producto
def actualizar_producto(producto_id, datos):
    # Actualizar en DB
    db.productos.update(producto_id, datos)

    # Notificar para invalidar cache
    r.publish('cache:invalidate', f'producto:{producto_id}')

# Servidores de cache suscritos
def cache_invalidator():
    pubsub.subscribe('cache:invalidate')
    for msg in pubsub.listen():
        if msg['type'] == 'message':
            cache.delete(msg['data'])

Sistema de eventos

# Publicar eventos del sistema
def emit_event(event_type, data):
    r.publish(f'events:{event_type}', json.dumps(data))

emit_event('user:login', {'user_id': 42, 'ip': '192.168.1.1'})
emit_event('order:created', {'order_id': 1001, 'total': 99.99})

# Consumidores específicos
pubsub.psubscribe('events:order:*')  # Solo órdenes
pubsub.subscribe('events:user:login')  # Solo logins

Limitaciones

Para mensajería persistente, considera Streams (ver capítulo siguiente).

Streams vs Pub/Sub

CaracterísticaPub/SubStreams
PersistenciaNo
ReplayNo
Consumer groupsNo
AcknowledgmentNo
VelocidadMás rápidoRápido

Ejercicios

  1. Implementa un chat simple con Pub/Sub
  2. Crea un sistema de notificaciones en tiempo real
  3. Implementa invalidación de cache distribuida

Resumen