Inicio / LLMOps / LLMOps: De Prototipo a Producción / Escalabilidad y Rendimiento

Escalabilidad y Rendimiento

Streaming, batching, rate limiting, caché y load balancing.

Avanzado
🔒 Solo lectura
📖

Estás en modo lectura

Puedes leer toda la lección, pero para marcar progreso, hacer ejercicios y ganar XP necesitas una cuenta Pro.

Desbloquear por $9/mes

Escalabilidad y Rendimiento

Arquitectura Escalable

┌─────────────────────────────────────────────────┐
│              ARQUITECTURA LLM                    │
│                                                  │
│  Clientes → Load Balancer → API Servers (N)     │
│                    ↓                             │
│              Rate Limiter                        │
│                    ↓                             │
│         ┌─── Request Queue ───┐                  │
│         ↓         ↓           ↓                  │
│    Proveedor1  Proveedor2  Self-hosted           │
│    (OpenAI)   (Anthropic)   (vLLM)              │
│         ↓         ↓           ↓                  │
│         └─── Response Cache ──┘                  │
│                    ↓                             │
│              Resultado                           │
└─────────────────────────────────────────────────┘

Streaming de Respuestas

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI

app = FastAPI()
client = OpenAI()

@app.post("/chat/stream")
async def chat_stream(message: str):
    async def generate():
        stream = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": message}],
            stream=True,
        )
        for chunk in stream:
            if chunk.choices[0].delta.content:
                yield f"data: {chunk.choices[0].delta.content}\n\n"
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(generate(), media_type="text/event-stream")

Request Batching

import asyncio
from collections import defaultdict

class BatchProcessor:
    """Agrupa requests similares para procesar en lote."""
    
    def __init__(self, batch_size=10, max_wait_ms=100):
        self.batch_size = batch_size
        self.max_wait_ms = max_wait_ms
        self.queue = asyncio.Queue()
        self.results = {}
    
    async def process_single(self, request_id: str, prompt: str) -> str:
        future = asyncio.Future()
        await self.queue.put((request_id, prompt, future))
        return await future
    
    async def batch_worker(self):
        while True:
            batch = []
            try:
                # Esperar primer item
                item = await asyncio.wait_for(
                    self.queue.get(), timeout=1.0
                )
                batch.append(item)
                
                # Recolectar más items (hasta batch_size)
                deadline = asyncio.get_event_loop().time() + self.max_wait_ms / 1000
                while len(batch) < self.batch_size:
                    remaining = deadline - asyncio.get_event_loop().time()
                    if remaining <= 0:
                        break
                    try:
                        item = await asyncio.wait_for(
                            self.queue.get(), timeout=remaining
                        )
                        batch.append(item)
                    except asyncio.TimeoutError:
                        break
                
                # Procesar lote
                prompts = [item[1] for item in batch]
                responses = await self.batch_llm_call(prompts)
                
                for (req_id, prompt, future), response in zip(batch, responses):
                    future.set_result(response)
                    
            except asyncio.TimeoutError:
                continue
    
    async def batch_llm_call(self, prompts: list[str]) -> list[str]:
        # Usar asyncio.gather para llamadas paralelas
        tasks = [self.single_call(p) for p in prompts]
        return await asyncio.gather(*tasks)
    
    async def single_call(self, prompt: str) -> str:
        response = await asyncio.to_thread(
            client.chat.completions.create,
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
        )
        return response.choices[0].message.content

Rate Limiting

import asyncio
import time
from dataclasses import dataclass, field

@dataclass
class RateLimiter:
    """Token bucket rate limiter."""
    
    max_requests_per_minute: int = 60
    max_tokens_per_minute: int = 90000
    
    _request_tokens: float = field(default=0, init=False)
    _token_tokens: float = field(default=0, init=False)
    _last_refill: float = field(default_factory=time.time, init=False)
    
    def _refill(self):
        now = time.time()
        elapsed = now - self._last_refill
        
        self._request_tokens = min(
            self.max_requests_per_minute,
            self._request_tokens + elapsed * (self.max_requests_per_minute / 60)
        )
        self._token_tokens = min(
            self.max_tokens_per_minute,
            self._token_tokens + elapsed * (self.max_tokens_per_minute / 60)
        )
        self._last_refill = now
    
    async def acquire(self, estimated_tokens: int = 1000):
        while True:
            self._refill()
            if self._request_tokens >= 1 and self._token_tokens >= estimated_tokens:
                self._request_tokens -= 1
                self._token_tokens -= estimated_tokens
                return
            await asyncio.sleep(0.1)

# Uso
limiter = RateLimiter(max_requests_per_minute=500, max_tokens_per_minute=150000)

async def rate_limited_call(prompt: str):
    estimated = len(prompt.split()) * 2  # Estimación de tokens
    await limiter.acquire(estimated)
    return await make_llm_call(prompt)

Caché de Respuestas

import hashlib
import json
import redis

class LLMCache:
    """Caché semántico para respuestas LLM."""
    
    def __init__(self, redis_url="redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.ttl = 3600  # 1 hora
    
    def _cache_key(self, model: str, messages: list, temperature: float) -> str:
        content = json.dumps({
            "model": model,
            "messages": messages,
            "temperature": temperature,
        }, sort_keys=True)
        return f"llm:{hashlib.sha256(content.encode()).hexdigest()}"
    
    def get(self, model: str, messages: list, temperature: float):
        key = self._cache_key(model, messages, temperature)
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)
        return None
    
    def set(self, model: str, messages: list, temperature: float, response: dict):
        # Solo cachear si temperature es baja (determinista)
        if temperature > 0.3:
            return
        
        key = self._cache_key(model, messages, temperature)
        self.redis.setex(key, self.ttl, json.dumps(response))
    
    def cached_completion(self, **kwargs):
        cached = self.get(kwargs["model"], kwargs["messages"], kwargs.get("temperature", 1.0))
        if cached:
            return cached
        
        response = client.chat.completions.create(**kwargs)
        result = {
            "content": response.choices[0].message.content,
            "usage": dict(response.usage),
        }
        self.set(kwargs["model"], kwargs["messages"], kwargs.get("temperature", 1.0), result)
        return result

Load Balancing entre Proveedores

import random
from dataclasses import dataclass

@dataclass
class Provider:
    name: str
    client: any
    model: str
    weight: float
    healthy: bool = True
    latency_ms: float = 0

class LoadBalancer:
    def __init__(self):
        self.providers = [
            Provider("openai", openai_client, "gpt-4o-mini", weight=0.5),
            Provider("anthropic", anthropic_client, "claude-3-haiku-20240307", weight=0.3),
            Provider("local", local_client, "llama-3-8b", weight=0.2),
        ]
    
    def select_provider(self) -> Provider:
        # Weighted random entre proveedores saludables
        healthy = [p for p in self.providers if p.healthy]
        if not healthy:
            raise RuntimeError("No hay proveedores disponibles")
        
        total_weight = sum(p.weight for p in healthy)
        r = random.uniform(0, total_weight)
        cumulative = 0
        for provider in healthy:
            cumulative += provider.weight
            if r <= cumulative:
                return provider
        return healthy[-1]
    
    async def call(self, messages: list) -> str:
        provider = self.select_provider()
        try:
            start = time.time()
            response = await self._call_provider(provider, messages)
            provider.latency_ms = (time.time() - start) * 1000
            return response
        except Exception as e:
            provider.healthy = False
            # Retry con otro proveedor
            return await self.call(messages)

Colas de Procesamiento Asíncrono

from celery import Celery

app = Celery("llm_tasks", broker="redis://localhost:6379")

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_document(self, document_id: str, task_type: str):
    """Procesar documento con LLM de forma asíncrona."""
    try:
        document = fetch_document(document_id)
        
        if task_type == "summarize":
            result = summarize(document)
        elif task_type == "extract":
            result = extract_entities(document)
        elif task_type == "translate":
            result = translate(document)
        
        save_result(document_id, result)
        return {"status": "completed", "document_id": document_id}
        
    except RateLimitError as e:
        raise self.retry(exc=e, countdown=int(e.retry_after or 60))

# Encolar trabajo
process_document.delay("doc-123", "summarize")

# Encolar lote
from celery import group
batch = group(
    process_document.s(doc_id, "summarize")
    for doc_id in document_ids
)
result = batch.apply_async()

Métricas de Rendimiento

import time
from prometheus_client import Histogram, Counter, Gauge

# Métricas
REQUEST_LATENCY = Histogram(
    "llm_request_duration_seconds",
    "Latencia de requests al LLM",
    ["model", "provider"],
    buckets=[0.1, 0.5, 1, 2, 5, 10, 30],
)

TOKENS_USED = Counter(
    "llm_tokens_total",
    "Tokens consumidos",
    ["model", "type"],  # type: prompt|completion
)

ACTIVE_REQUESTS = Gauge(
    "llm_active_requests",
    "Requests activos al LLM",
    ["model"],
)

CACHE_HITS = Counter(
    "llm_cache_hits_total",
    "Hits de caché",
    ["result"],  # hit|miss
)

async def tracked_completion(**kwargs):
    model = kwargs["model"]
    ACTIVE_REQUESTS.labels(model=model).inc()
    
    start = time.time()
    try:
        response = await make_completion(**kwargs)
        
        REQUEST_LATENCY.labels(model=model, provider="openai").observe(
            time.time() - start
        )
        TOKENS_USED.labels(model=model, type="prompt").inc(
            response.usage.prompt_tokens
        )
        TOKENS_USED.labels(model=model, type="completion").inc(
            response.usage.completion_tokens
        )
        
        return response
    finally:
        ACTIVE_REQUESTS.labels(model=model).dec()

Resumen

Para escalar aplicaciones LLM: implementa streaming para mejor UX, usa batching para throughput, rate limiting para protección de costos, caché para eficiencia, load balancing entre proveedores para resiliencia, y colas para procesamiento asíncrono. Mide todo con métricas de latencia, throughput y tokens.

🔒

Ejercicio práctico disponible

Rate limiter y caché para LLMs

Desbloquear ejercicios
// Rate limiter y caché para LLMs
// Desbloquea Pro para acceder a este ejercicio
// y ganar +50 XP al completarlo

function ejemplo() {
    // Tu código aquí...
}

¿Te gustó esta lección?

Con Pro puedes marcar progreso, hacer ejercicios, tomar quizzes, ganar XP y obtener tu constancia.

Ver planes desde $9/mes