← Volver al listado de tecnologías

Proyecto Final: Sistema de Cache y Sesiones

Por: Artiko
valkeyproyectocachesesionesapi

Proyecto Final

Descripción

Construiremos un sistema de backend con:

Arquitectura

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Cliente   │────▶│   API       │────▶│   Valkey    │
└─────────────┘     └─────────────┘     └─────────────┘
                           │                   │
                           ▼                   │
                    ┌─────────────┐            │
                    │  Database   │◀───────────┘
                    └─────────────┘      (cache)

Estructura del proyecto

proyecto/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── config.py
│   ├── valkey_client.py
│   ├── cache.py
│   ├── sessions.py
│   ├── rate_limiter.py
│   ├── task_queue.py
│   └── notifications.py
├── docker-compose.yml
└── requirements.txt

Configuración

docker-compose.yml

services:
  valkey:
    image: valkey/valkey:latest
    ports:
      - "6379:6379"
    volumes:
      - valkey_data:/data
    command: valkey-server --appendonly yes

  api:
    build: .
    ports:
      - "8000:8000"
    depends_on:
      - valkey
    environment:
      - VALKEY_URL=valkey://valkey:6379

volumes:
  valkey_data:

config.py

import os

VALKEY_URL = os.getenv('VALKEY_URL', 'valkey://localhost:6379')
CACHE_TTL = 300
SESSION_TTL = 3600
RATE_LIMIT = 100
RATE_WINDOW = 60

valkey_client.py

import valkey
from config import VALKEY_URL

pool = valkey.ConnectionPool.from_url(
    VALKEY_URL,
    max_connections=20,
    decode_responses=True
)

def get_client():
    return valkey.Valkey(connection_pool=pool)

r = get_client()

Módulo de Cache

cache.py

import json
import hashlib
from functools import wraps
from valkey_client import r
from config import CACHE_TTL

def cache_key(*args, **kwargs):
    """Genera clave única para argumentos."""
    data = f"{args}:{sorted(kwargs.items())}"
    return hashlib.md5(data.encode()).hexdigest()

def cached(prefix, ttl=CACHE_TTL):
    """Decorator para cachear resultados."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            key = f"cache:{prefix}:{cache_key(*args, **kwargs)}"

            # Intentar obtener de cache
            cached = r.get(key)
            if cached:
                return json.loads(cached)

            # Ejecutar función
            result = func(*args, **kwargs)

            # Guardar en cache
            r.setex(key, ttl, json.dumps(result))
            return result
        return wrapper
    return decorator

def invalidate(prefix, *args, **kwargs):
    """Invalida una entrada específica."""
    key = f"cache:{prefix}:{cache_key(*args, **kwargs)}"
    r.delete(key)

def invalidate_pattern(pattern):
    """Invalida todas las claves que coincidan."""
    cursor = 0
    while True:
        cursor, keys = r.scan(cursor, match=f"cache:{pattern}*", count=100)
        if keys:
            r.delete(*keys)
        if cursor == 0:
            break

# Uso
@cached('usuarios')
def get_user(user_id):
    # Consulta a base de datos
    return {'id': user_id, 'name': 'Usuario'}

Módulo de Sesiones

sessions.py

import json
import secrets
from valkey_client import r
from config import SESSION_TTL

class SessionManager:
    PREFIX = 'session'

    def create(self, user_id, data=None):
        """Crea nueva sesión."""
        token = secrets.token_urlsafe(32)
        session_data = {
            'user_id': user_id,
            'data': data or {}
        }

        key = f"{self.PREFIX}:{token}"
        r.setex(key, SESSION_TTL, json.dumps(session_data))

        # Índice de sesiones por usuario
        r.sadd(f"user_sessions:{user_id}", token)

        return token

    def get(self, token):
        """Obtiene datos de sesión."""
        key = f"{self.PREFIX}:{token}"
        data = r.get(key)
        if data:
            # Renovar TTL
            r.expire(key, SESSION_TTL)
            return json.loads(data)
        return None

    def update(self, token, data):
        """Actualiza datos de sesión."""
        session = self.get(token)
        if session:
            session['data'].update(data)
            key = f"{self.PREFIX}:{token}"
            r.setex(key, SESSION_TTL, json.dumps(session))
            return True
        return False

    def destroy(self, token):
        """Destruye sesión."""
        session = self.get(token)
        if session:
            r.delete(f"{self.PREFIX}:{token}")
            r.srem(f"user_sessions:{session['user_id']}", token)
            return True
        return False

    def destroy_all(self, user_id):
        """Destruye todas las sesiones de un usuario."""
        tokens = r.smembers(f"user_sessions:{user_id}")
        for token in tokens:
            r.delete(f"{self.PREFIX}:{token}")
        r.delete(f"user_sessions:{user_id}")

sessions = SessionManager()

Rate Limiter

rate_limiter.py

from valkey_client import r
from config import RATE_LIMIT, RATE_WINDOW

# Script Lua para rate limiting atómico
RATE_LIMIT_SCRIPT = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])

local current = redis.call('GET', key)

if current == false then
    redis.call('SET', key, 1, 'EX', window)
    return {1, limit - 1, window}
end

if tonumber(current) >= limit then
    local ttl = redis.call('TTL', key)
    return {0, 0, ttl}
end

local new_count = redis.call('INCR', key)
return {1, limit - new_count, redis.call('TTL', key)}
"""

rate_limit_lua = r.register_script(RATE_LIMIT_SCRIPT)

class RateLimiter:
    def __init__(self, limit=RATE_LIMIT, window=RATE_WINDOW):
        self.limit = limit
        self.window = window

    def check(self, identifier):
        """
        Verifica rate limit.
        Retorna (allowed, remaining, reset_in)
        """
        key = f"rate:{identifier}"
        result = rate_limit_lua(keys=[key], args=[self.limit, self.window])
        return {
            'allowed': bool(result[0]),
            'remaining': result[1],
            'reset_in': result[2]
        }

    def reset(self, identifier):
        """Resetea el contador."""
        r.delete(f"rate:{identifier}")

rate_limiter = RateLimiter()

Cola de Tareas

task_queue.py

import json
import uuid
from datetime import datetime
from valkey_client import r

class TaskQueue:
    def __init__(self, name='default'):
        self.queue_key = f"queue:{name}"
        self.processing_key = f"queue:{name}:processing"
        self.results_key = f"queue:{name}:results"

    def enqueue(self, task_type, payload, priority=0):
        """Encola una tarea."""
        task_id = str(uuid.uuid4())
        task = {
            'id': task_id,
            'type': task_type,
            'payload': payload,
            'created_at': datetime.now().isoformat(),
            'status': 'pending'
        }
        # Sorted set con prioridad
        r.zadd(self.queue_key, {json.dumps(task): priority})
        return task_id

    def dequeue(self, timeout=0):
        """Obtiene siguiente tarea."""
        # Mover de cola a procesando atómicamente
        result = r.bzpopmin(self.queue_key, timeout)
        if result:
            _, task_json, _ = result
            task = json.loads(task_json)
            task['status'] = 'processing'
            r.hset(self.processing_key, task['id'], json.dumps(task))
            return task
        return None

    def complete(self, task_id, result=None):
        """Marca tarea como completada."""
        task_json = r.hget(self.processing_key, task_id)
        if task_json:
            task = json.loads(task_json)
            task['status'] = 'completed'
            task['result'] = result
            task['completed_at'] = datetime.now().isoformat()

            # Mover a resultados
            r.hset(self.results_key, task_id, json.dumps(task))
            r.hdel(self.processing_key, task_id)
            # TTL para resultados
            r.expire(self.results_key, 3600)

    def fail(self, task_id, error):
        """Marca tarea como fallida."""
        task_json = r.hget(self.processing_key, task_id)
        if task_json:
            task = json.loads(task_json)
            task['status'] = 'failed'
            task['error'] = str(error)
            r.hset(self.results_key, task_id, json.dumps(task))
            r.hdel(self.processing_key, task_id)

    def get_result(self, task_id):
        """Obtiene resultado de tarea."""
        result = r.hget(self.results_key, task_id)
        return json.loads(result) if result else None

queue = TaskQueue()

Notificaciones

notifications.py

import json
from valkey_client import r

class NotificationService:
    def __init__(self):
        self.pubsub = r.pubsub()

    def send(self, user_id, notification):
        """Envía notificación a usuario."""
        channel = f"notifications:{user_id}"
        r.publish(channel, json.dumps(notification))

        # También guardar en lista para offline
        r.lpush(f"notifications:pending:{user_id}", json.dumps(notification))
        r.ltrim(f"notifications:pending:{user_id}", 0, 99)

    def get_pending(self, user_id, limit=10):
        """Obtiene notificaciones pendientes."""
        key = f"notifications:pending:{user_id}"
        notifications = r.lrange(key, 0, limit - 1)
        return [json.loads(n) for n in notifications]

    def clear_pending(self, user_id):
        """Limpia notificaciones pendientes."""
        r.delete(f"notifications:pending:{user_id}")

    def subscribe(self, user_id, callback):
        """Suscribe a notificaciones en tiempo real."""
        channel = f"notifications:{user_id}"
        self.pubsub.subscribe(**{channel: callback})
        return self.pubsub.run_in_thread(sleep_time=0.001)

notifications = NotificationService()

API Principal

main.py

from fastapi import FastAPI, HTTPException, Header
from cache import cached, invalidate
from sessions import sessions
from rate_limiter import rate_limiter
from task_queue import queue
from notifications import notifications

app = FastAPI()

# Middleware de rate limiting
@app.middleware("http")
async def rate_limit_middleware(request, call_next):
    client_ip = request.client.host
    result = rate_limiter.check(client_ip)

    if not result['allowed']:
        raise HTTPException(
            status_code=429,
            detail=f"Rate limit. Retry in {result['reset_in']}s"
        )

    response = await call_next(request)
    response.headers['X-RateLimit-Remaining'] = str(result['remaining'])
    return response

# Endpoints
@app.post("/login")
async def login(user_id: str):
    token = sessions.create(user_id)
    return {"token": token}

@app.get("/profile")
async def profile(authorization: str = Header()):
    session = sessions.get(authorization)
    if not session:
        raise HTTPException(401, "Invalid session")
    return get_user_profile(session['user_id'])

@cached('profile')
def get_user_profile(user_id):
    # Simula consulta a DB
    return {"id": user_id, "name": f"User {user_id}"}

@app.post("/tasks")
async def create_task(task_type: str, payload: dict):
    task_id = queue.enqueue(task_type, payload)
    return {"task_id": task_id}

@app.get("/tasks/{task_id}")
async def get_task(task_id: str):
    result = queue.get_result(task_id)
    if not result:
        raise HTTPException(404, "Task not found")
    return result

@app.post("/notify/{user_id}")
async def notify(user_id: str, message: str):
    notifications.send(user_id, {"message": message})
    return {"status": "sent"}

Ejecutar el proyecto

# Iniciar servicios
docker-compose up -d

# Instalar dependencias
pip install fastapi uvicorn valkey

# Ejecutar API
uvicorn main:app --reload

# Probar
curl -X POST http://localhost:8000/login?user_id=123
curl -H "Authorization: <token>" http://localhost:8000/profile

Resumen del tutorial

En este tutorial aprendimos:

  1. Introducción: Qué es Valkey y su instalación
  2. Tipos de datos: Strings, Hashes, Lists, Sets, Sorted Sets
  3. Comandos: Operaciones básicas y avanzadas
  4. Persistencia: RDB y AOF
  5. Pub/Sub: Mensajería en tiempo real
  6. Transacciones: MULTI/EXEC y WATCH
  7. Lua: Scripts atómicos
  8. Replicación: Master-Replica y Sentinel
  9. Clustering: Escalabilidad horizontal
  10. Seguridad: ACL y TLS
  11. Rendimiento: Optimización y benchmarks
  12. Clientes: Integración con lenguajes
  13. Proyecto: Aplicación completa