← Volver al listado de tecnologías
Proyecto Final: Sistema de Cache y Sesiones
Proyecto Final
Descripción
Construiremos un sistema de backend con:
- Cache de API con invalidación inteligente
- Gestión de sesiones de usuario
- Rate limiting por usuario
- Cola de tareas asíncronas
- Notificaciones en tiempo real
Arquitectura
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Cliente │────▶│ API │────▶│ Valkey │
└─────────────┘ └─────────────┘ └─────────────┘
│ │
▼ │
┌─────────────┐ │
│ Database │◀───────────┘
└─────────────┘ (cache)
Estructura del proyecto
proyecto/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── config.py
│ ├── valkey_client.py
│ ├── cache.py
│ ├── sessions.py
│ ├── rate_limiter.py
│ ├── task_queue.py
│ └── notifications.py
├── docker-compose.yml
└── requirements.txt
Configuración
docker-compose.yml
services:
valkey:
image: valkey/valkey:latest
ports:
- "6379:6379"
volumes:
- valkey_data:/data
command: valkey-server --appendonly yes
api:
build: .
ports:
- "8000:8000"
depends_on:
- valkey
environment:
- VALKEY_URL=valkey://valkey:6379
volumes:
valkey_data:
config.py
import os
VALKEY_URL = os.getenv('VALKEY_URL', 'valkey://localhost:6379')
CACHE_TTL = 300
SESSION_TTL = 3600
RATE_LIMIT = 100
RATE_WINDOW = 60
valkey_client.py
import valkey
from config import VALKEY_URL
pool = valkey.ConnectionPool.from_url(
VALKEY_URL,
max_connections=20,
decode_responses=True
)
def get_client():
return valkey.Valkey(connection_pool=pool)
r = get_client()
Módulo de Cache
cache.py
import json
import hashlib
from functools import wraps
from valkey_client import r
from config import CACHE_TTL
def cache_key(*args, **kwargs):
"""Genera clave única para argumentos."""
data = f"{args}:{sorted(kwargs.items())}"
return hashlib.md5(data.encode()).hexdigest()
def cached(prefix, ttl=CACHE_TTL):
"""Decorator para cachear resultados."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
key = f"cache:{prefix}:{cache_key(*args, **kwargs)}"
# Intentar obtener de cache
cached = r.get(key)
if cached:
return json.loads(cached)
# Ejecutar función
result = func(*args, **kwargs)
# Guardar en cache
r.setex(key, ttl, json.dumps(result))
return result
return wrapper
return decorator
def invalidate(prefix, *args, **kwargs):
"""Invalida una entrada específica."""
key = f"cache:{prefix}:{cache_key(*args, **kwargs)}"
r.delete(key)
def invalidate_pattern(pattern):
"""Invalida todas las claves que coincidan."""
cursor = 0
while True:
cursor, keys = r.scan(cursor, match=f"cache:{pattern}*", count=100)
if keys:
r.delete(*keys)
if cursor == 0:
break
# Uso
@cached('usuarios')
def get_user(user_id):
# Consulta a base de datos
return {'id': user_id, 'name': 'Usuario'}
Módulo de Sesiones
sessions.py
import json
import secrets
from valkey_client import r
from config import SESSION_TTL
class SessionManager:
PREFIX = 'session'
def create(self, user_id, data=None):
"""Crea nueva sesión."""
token = secrets.token_urlsafe(32)
session_data = {
'user_id': user_id,
'data': data or {}
}
key = f"{self.PREFIX}:{token}"
r.setex(key, SESSION_TTL, json.dumps(session_data))
# Índice de sesiones por usuario
r.sadd(f"user_sessions:{user_id}", token)
return token
def get(self, token):
"""Obtiene datos de sesión."""
key = f"{self.PREFIX}:{token}"
data = r.get(key)
if data:
# Renovar TTL
r.expire(key, SESSION_TTL)
return json.loads(data)
return None
def update(self, token, data):
"""Actualiza datos de sesión."""
session = self.get(token)
if session:
session['data'].update(data)
key = f"{self.PREFIX}:{token}"
r.setex(key, SESSION_TTL, json.dumps(session))
return True
return False
def destroy(self, token):
"""Destruye sesión."""
session = self.get(token)
if session:
r.delete(f"{self.PREFIX}:{token}")
r.srem(f"user_sessions:{session['user_id']}", token)
return True
return False
def destroy_all(self, user_id):
"""Destruye todas las sesiones de un usuario."""
tokens = r.smembers(f"user_sessions:{user_id}")
for token in tokens:
r.delete(f"{self.PREFIX}:{token}")
r.delete(f"user_sessions:{user_id}")
sessions = SessionManager()
Rate Limiter
rate_limiter.py
from valkey_client import r
from config import RATE_LIMIT, RATE_WINDOW
# Script Lua para rate limiting atómico
RATE_LIMIT_SCRIPT = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('GET', key)
if current == false then
redis.call('SET', key, 1, 'EX', window)
return {1, limit - 1, window}
end
if tonumber(current) >= limit then
local ttl = redis.call('TTL', key)
return {0, 0, ttl}
end
local new_count = redis.call('INCR', key)
return {1, limit - new_count, redis.call('TTL', key)}
"""
rate_limit_lua = r.register_script(RATE_LIMIT_SCRIPT)
class RateLimiter:
def __init__(self, limit=RATE_LIMIT, window=RATE_WINDOW):
self.limit = limit
self.window = window
def check(self, identifier):
"""
Verifica rate limit.
Retorna (allowed, remaining, reset_in)
"""
key = f"rate:{identifier}"
result = rate_limit_lua(keys=[key], args=[self.limit, self.window])
return {
'allowed': bool(result[0]),
'remaining': result[1],
'reset_in': result[2]
}
def reset(self, identifier):
"""Resetea el contador."""
r.delete(f"rate:{identifier}")
rate_limiter = RateLimiter()
Cola de Tareas
task_queue.py
import json
import uuid
from datetime import datetime
from valkey_client import r
class TaskQueue:
def __init__(self, name='default'):
self.queue_key = f"queue:{name}"
self.processing_key = f"queue:{name}:processing"
self.results_key = f"queue:{name}:results"
def enqueue(self, task_type, payload, priority=0):
"""Encola una tarea."""
task_id = str(uuid.uuid4())
task = {
'id': task_id,
'type': task_type,
'payload': payload,
'created_at': datetime.now().isoformat(),
'status': 'pending'
}
# Sorted set con prioridad
r.zadd(self.queue_key, {json.dumps(task): priority})
return task_id
def dequeue(self, timeout=0):
"""Obtiene siguiente tarea."""
# Mover de cola a procesando atómicamente
result = r.bzpopmin(self.queue_key, timeout)
if result:
_, task_json, _ = result
task = json.loads(task_json)
task['status'] = 'processing'
r.hset(self.processing_key, task['id'], json.dumps(task))
return task
return None
def complete(self, task_id, result=None):
"""Marca tarea como completada."""
task_json = r.hget(self.processing_key, task_id)
if task_json:
task = json.loads(task_json)
task['status'] = 'completed'
task['result'] = result
task['completed_at'] = datetime.now().isoformat()
# Mover a resultados
r.hset(self.results_key, task_id, json.dumps(task))
r.hdel(self.processing_key, task_id)
# TTL para resultados
r.expire(self.results_key, 3600)
def fail(self, task_id, error):
"""Marca tarea como fallida."""
task_json = r.hget(self.processing_key, task_id)
if task_json:
task = json.loads(task_json)
task['status'] = 'failed'
task['error'] = str(error)
r.hset(self.results_key, task_id, json.dumps(task))
r.hdel(self.processing_key, task_id)
def get_result(self, task_id):
"""Obtiene resultado de tarea."""
result = r.hget(self.results_key, task_id)
return json.loads(result) if result else None
queue = TaskQueue()
Notificaciones
notifications.py
import json
from valkey_client import r
class NotificationService:
def __init__(self):
self.pubsub = r.pubsub()
def send(self, user_id, notification):
"""Envía notificación a usuario."""
channel = f"notifications:{user_id}"
r.publish(channel, json.dumps(notification))
# También guardar en lista para offline
r.lpush(f"notifications:pending:{user_id}", json.dumps(notification))
r.ltrim(f"notifications:pending:{user_id}", 0, 99)
def get_pending(self, user_id, limit=10):
"""Obtiene notificaciones pendientes."""
key = f"notifications:pending:{user_id}"
notifications = r.lrange(key, 0, limit - 1)
return [json.loads(n) for n in notifications]
def clear_pending(self, user_id):
"""Limpia notificaciones pendientes."""
r.delete(f"notifications:pending:{user_id}")
def subscribe(self, user_id, callback):
"""Suscribe a notificaciones en tiempo real."""
channel = f"notifications:{user_id}"
self.pubsub.subscribe(**{channel: callback})
return self.pubsub.run_in_thread(sleep_time=0.001)
notifications = NotificationService()
API Principal
main.py
from fastapi import FastAPI, HTTPException, Header
from cache import cached, invalidate
from sessions import sessions
from rate_limiter import rate_limiter
from task_queue import queue
from notifications import notifications
app = FastAPI()
# Middleware de rate limiting
@app.middleware("http")
async def rate_limit_middleware(request, call_next):
client_ip = request.client.host
result = rate_limiter.check(client_ip)
if not result['allowed']:
raise HTTPException(
status_code=429,
detail=f"Rate limit. Retry in {result['reset_in']}s"
)
response = await call_next(request)
response.headers['X-RateLimit-Remaining'] = str(result['remaining'])
return response
# Endpoints
@app.post("/login")
async def login(user_id: str):
token = sessions.create(user_id)
return {"token": token}
@app.get("/profile")
async def profile(authorization: str = Header()):
session = sessions.get(authorization)
if not session:
raise HTTPException(401, "Invalid session")
return get_user_profile(session['user_id'])
@cached('profile')
def get_user_profile(user_id):
# Simula consulta a DB
return {"id": user_id, "name": f"User {user_id}"}
@app.post("/tasks")
async def create_task(task_type: str, payload: dict):
task_id = queue.enqueue(task_type, payload)
return {"task_id": task_id}
@app.get("/tasks/{task_id}")
async def get_task(task_id: str):
result = queue.get_result(task_id)
if not result:
raise HTTPException(404, "Task not found")
return result
@app.post("/notify/{user_id}")
async def notify(user_id: str, message: str):
notifications.send(user_id, {"message": message})
return {"status": "sent"}
Ejecutar el proyecto
# Iniciar servicios
docker-compose up -d
# Instalar dependencias
pip install fastapi uvicorn valkey
# Ejecutar API
uvicorn main:app --reload
# Probar
curl -X POST http://localhost:8000/login?user_id=123
curl -H "Authorization: <token>" http://localhost:8000/profile
Resumen del tutorial
En este tutorial aprendimos:
- Introducción: Qué es Valkey y su instalación
- Tipos de datos: Strings, Hashes, Lists, Sets, Sorted Sets
- Comandos: Operaciones básicas y avanzadas
- Persistencia: RDB y AOF
- Pub/Sub: Mensajería en tiempo real
- Transacciones: MULTI/EXEC y WATCH
- Lua: Scripts atómicos
- Replicación: Master-Replica y Sentinel
- Clustering: Escalabilidad horizontal
- Seguridad: ACL y TLS
- Rendimiento: Optimización y benchmarks
- Clientes: Integración con lenguajes
- Proyecto: Aplicación completa