← Volver al listado de tecnologías
Pub/Sub en Valkey
Pub/Sub (Publish/Subscribe)
Conceptos básicos
Pub/Sub permite comunicación asíncrona entre procesos:
- Publisher: Envía mensajes a canales
- Subscriber: Recibe mensajes de canales suscritos
- Canal: Nombre lógico para agrupar mensajes
Publisher → Canal → Subscriber 1
→ Subscriber 2
→ Subscriber N
Suscripción a canales
Terminal del suscriptor
# Suscribirse a un canal
SUBSCRIBE noticias
# Reading messages... (press Ctrl-C to quit)
# 1) "subscribe"
# 2) "noticias"
# 3) (integer) 1
# Suscribirse a múltiples canales
SUBSCRIBE noticias deportes clima
# Suscribirse por patrón
PSUBSCRIBE news:*
PSUBSCRIBE user:*:events
Terminal del publisher
# Publicar mensaje
PUBLISH noticias "Nueva noticia importante"
# (integer) 2 <- número de suscriptores que recibieron
PUBLISH news:tech "Nuevo lanzamiento de software"
Patrones avanzados
Wildcards con PSUBSCRIBE
# Patrones soportados
PSUBSCRIBE h?llo # hello, hallo, hxllo
PSUBSCRIBE h*llo # hllo, heeeello
PSUBSCRIBE h[ae]llo # hello, hallo
# Ejemplo práctico
PSUBSCRIBE chat:room:* # Todos los rooms
PSUBSCRIBE events:user:*:login # Logins de usuarios
Desuscribirse
# Desuscribirse de canal específico
UNSUBSCRIBE noticias
# Desuscribirse de patrón
PUNSUBSCRIBE news:*
# Desuscribirse de todo
UNSUBSCRIBE
PUNSUBSCRIBE
Información de Pub/Sub
# Canales activos
PUBSUB CHANNELS
PUBSUB CHANNELS news:*
# Número de suscriptores por canal
PUBSUB NUMSUB noticias deportes
# Número de suscripciones por patrón
PUBSUB NUMPAT
Implementación en Python
import valkey
import threading
# Conexión
r = valkey.Valkey(host='localhost', port=6379, decode_responses=True)
# Suscriptor
def subscriber():
pubsub = r.pubsub()
pubsub.subscribe('noticias')
for message in pubsub.listen():
if message['type'] == 'message':
print(f"Recibido: {message['data']}")
# Iniciar en thread separado
thread = threading.Thread(target=subscriber)
thread.daemon = True
thread.start()
# Publisher
r.publish('noticias', 'Hola desde Python!')
Con patrón
def subscriber_pattern():
pubsub = r.pubsub()
pubsub.psubscribe('events:*')
for message in pubsub.listen():
if message['type'] == 'pmessage':
print(f"Canal: {message['channel']}")
print(f"Mensaje: {message['data']}")
Implementación en Node.js
import { createClient } from 'valkey';
const subscriber = createClient();
const publisher = createClient();
await subscriber.connect();
await publisher.connect();
// Suscribirse
await subscriber.subscribe('noticias', (message) => {
console.log('Recibido:', message);
});
// Con patrón
await subscriber.pSubscribe('events:*', (message, channel) => {
console.log(`Canal ${channel}: ${message}`);
});
// Publicar
await publisher.publish('noticias', 'Mensaje desde Node.js');
Casos de uso
Chat en tiempo real
# Estructura de canales
chat:room:general
chat:room:dev
chat:user:123:direct
# Usuario se une a room
SUBSCRIBE chat:room:general
# Enviar mensaje
PUBLISH chat:room:general '{"user":"juan","msg":"Hola!"}'
Notificaciones
# Servicio de notificaciones
def enviar_notificacion(user_id, mensaje):
canal = f"notifications:user:{user_id}"
r.publish(canal, json.dumps({
'tipo': 'notificacion',
'mensaje': mensaje,
'timestamp': time.time()
}))
# Cliente suscrito
pubsub.psubscribe('notifications:user:42')
Invalidación de cache
# Cuando se actualiza un producto
def actualizar_producto(producto_id, datos):
# Actualizar en DB
db.productos.update(producto_id, datos)
# Notificar para invalidar cache
r.publish('cache:invalidate', f'producto:{producto_id}')
# Servidores de cache suscritos
def cache_invalidator():
pubsub.subscribe('cache:invalidate')
for msg in pubsub.listen():
if msg['type'] == 'message':
cache.delete(msg['data'])
Sistema de eventos
# Publicar eventos del sistema
def emit_event(event_type, data):
r.publish(f'events:{event_type}', json.dumps(data))
emit_event('user:login', {'user_id': 42, 'ip': '192.168.1.1'})
emit_event('order:created', {'order_id': 1001, 'total': 99.99})
# Consumidores específicos
pubsub.psubscribe('events:order:*') # Solo órdenes
pubsub.subscribe('events:user:login') # Solo logins
Limitaciones
- No persistencia: Mensajes se pierden si no hay suscriptores
- No acknowledgment: No hay confirmación de recepción
- No replay: No se pueden recuperar mensajes pasados
Para mensajería persistente, considera Streams (ver capítulo siguiente).
Streams vs Pub/Sub
| Característica | Pub/Sub | Streams |
|---|---|---|
| Persistencia | No | Sí |
| Replay | No | Sí |
| Consumer groups | No | Sí |
| Acknowledgment | No | Sí |
| Velocidad | Más rápido | Rápido |
Ejercicios
- Implementa un chat simple con Pub/Sub
- Crea un sistema de notificaciones en tiempo real
- Implementa invalidación de cache distribuida
Resumen
SUBSCRIBE/PSUBSCRIBEpara recibir mensajesPUBLISHpara enviar mensajes- Los patrones permiten suscripciones flexibles
- Ideal para comunicación en tiempo real sin persistencia
- Para persistencia, usar Streams