Escalabilidad y Rendimiento
Arquitectura Escalable
┌─────────────────────────────────────────────────┐
│ ARQUITECTURA LLM │
│ │
│ Clientes → Load Balancer → API Servers (N) │
│ ↓ │
│ Rate Limiter │
│ ↓ │
│ ┌─── Request Queue ───┐ │
│ ↓ ↓ ↓ │
│ Proveedor1 Proveedor2 Self-hosted │
│ (OpenAI) (Anthropic) (vLLM) │
│ ↓ ↓ ↓ │
│ └─── Response Cache ──┘ │
│ ↓ │
│ Resultado │
└─────────────────────────────────────────────────┘
Streaming de Respuestas
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
app = FastAPI()
client = OpenAI()
@app.post("/chat/stream")
async def chat_stream(message: str):
async def generate():
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": message}],
stream=True,
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield f"data: {chunk.choices[0].delta.content}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
Request Batching
import asyncio
from collections import defaultdict
class BatchProcessor:
"""Agrupa requests similares para procesar en lote."""
def __init__(self, batch_size=10, max_wait_ms=100):
self.batch_size = batch_size
self.max_wait_ms = max_wait_ms
self.queue = asyncio.Queue()
self.results = {}
async def process_single(self, request_id: str, prompt: str) -> str:
future = asyncio.Future()
await self.queue.put((request_id, prompt, future))
return await future
async def batch_worker(self):
while True:
batch = []
try:
# Esperar primer item
item = await asyncio.wait_for(
self.queue.get(), timeout=1.0
)
batch.append(item)
# Recolectar más items (hasta batch_size)
deadline = asyncio.get_event_loop().time() + self.max_wait_ms / 1000
while len(batch) < self.batch_size:
remaining = deadline - asyncio.get_event_loop().time()
if remaining <= 0:
break
try:
item = await asyncio.wait_for(
self.queue.get(), timeout=remaining
)
batch.append(item)
except asyncio.TimeoutError:
break
# Procesar lote
prompts = [item[1] for item in batch]
responses = await self.batch_llm_call(prompts)
for (req_id, prompt, future), response in zip(batch, responses):
future.set_result(response)
except asyncio.TimeoutError:
continue
async def batch_llm_call(self, prompts: list[str]) -> list[str]:
# Usar asyncio.gather para llamadas paralelas
tasks = [self.single_call(p) for p in prompts]
return await asyncio.gather(*tasks)
async def single_call(self, prompt: str) -> str:
response = await asyncio.to_thread(
client.chat.completions.create,
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
)
return response.choices[0].message.content
Rate Limiting
import asyncio
import time
from dataclasses import dataclass, field
@dataclass
class RateLimiter:
"""Token bucket rate limiter."""
max_requests_per_minute: int = 60
max_tokens_per_minute: int = 90000
_request_tokens: float = field(default=0, init=False)
_token_tokens: float = field(default=0, init=False)
_last_refill: float = field(default_factory=time.time, init=False)
def _refill(self):
now = time.time()
elapsed = now - self._last_refill
self._request_tokens = min(
self.max_requests_per_minute,
self._request_tokens + elapsed * (self.max_requests_per_minute / 60)
)
self._token_tokens = min(
self.max_tokens_per_minute,
self._token_tokens + elapsed * (self.max_tokens_per_minute / 60)
)
self._last_refill = now
async def acquire(self, estimated_tokens: int = 1000):
while True:
self._refill()
if self._request_tokens >= 1 and self._token_tokens >= estimated_tokens:
self._request_tokens -= 1
self._token_tokens -= estimated_tokens
return
await asyncio.sleep(0.1)
# Uso
limiter = RateLimiter(max_requests_per_minute=500, max_tokens_per_minute=150000)
async def rate_limited_call(prompt: str):
estimated = len(prompt.split()) * 2 # Estimación de tokens
await limiter.acquire(estimated)
return await make_llm_call(prompt)
Caché de Respuestas
import hashlib
import json
import redis
class LLMCache:
"""Caché semántico para respuestas LLM."""
def __init__(self, redis_url="redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.ttl = 3600 # 1 hora
def _cache_key(self, model: str, messages: list, temperature: float) -> str:
content = json.dumps({
"model": model,
"messages": messages,
"temperature": temperature,
}, sort_keys=True)
return f"llm:{hashlib.sha256(content.encode()).hexdigest()}"
def get(self, model: str, messages: list, temperature: float):
key = self._cache_key(model, messages, temperature)
cached = self.redis.get(key)
if cached:
return json.loads(cached)
return None
def set(self, model: str, messages: list, temperature: float, response: dict):
# Solo cachear si temperature es baja (determinista)
if temperature > 0.3:
return
key = self._cache_key(model, messages, temperature)
self.redis.setex(key, self.ttl, json.dumps(response))
def cached_completion(self, **kwargs):
cached = self.get(kwargs["model"], kwargs["messages"], kwargs.get("temperature", 1.0))
if cached:
return cached
response = client.chat.completions.create(**kwargs)
result = {
"content": response.choices[0].message.content,
"usage": dict(response.usage),
}
self.set(kwargs["model"], kwargs["messages"], kwargs.get("temperature", 1.0), result)
return result
Load Balancing entre Proveedores
import random
from dataclasses import dataclass
@dataclass
class Provider:
name: str
client: any
model: str
weight: float
healthy: bool = True
latency_ms: float = 0
class LoadBalancer:
def __init__(self):
self.providers = [
Provider("openai", openai_client, "gpt-4o-mini", weight=0.5),
Provider("anthropic", anthropic_client, "claude-3-haiku-20240307", weight=0.3),
Provider("local", local_client, "llama-3-8b", weight=0.2),
]
def select_provider(self) -> Provider:
# Weighted random entre proveedores saludables
healthy = [p for p in self.providers if p.healthy]
if not healthy:
raise RuntimeError("No hay proveedores disponibles")
total_weight = sum(p.weight for p in healthy)
r = random.uniform(0, total_weight)
cumulative = 0
for provider in healthy:
cumulative += provider.weight
if r <= cumulative:
return provider
return healthy[-1]
async def call(self, messages: list) -> str:
provider = self.select_provider()
try:
start = time.time()
response = await self._call_provider(provider, messages)
provider.latency_ms = (time.time() - start) * 1000
return response
except Exception as e:
provider.healthy = False
# Retry con otro proveedor
return await self.call(messages)
Colas de Procesamiento Asíncrono
from celery import Celery
app = Celery("llm_tasks", broker="redis://localhost:6379")
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_document(self, document_id: str, task_type: str):
"""Procesar documento con LLM de forma asíncrona."""
try:
document = fetch_document(document_id)
if task_type == "summarize":
result = summarize(document)
elif task_type == "extract":
result = extract_entities(document)
elif task_type == "translate":
result = translate(document)
save_result(document_id, result)
return {"status": "completed", "document_id": document_id}
except RateLimitError as e:
raise self.retry(exc=e, countdown=int(e.retry_after or 60))
# Encolar trabajo
process_document.delay("doc-123", "summarize")
# Encolar lote
from celery import group
batch = group(
process_document.s(doc_id, "summarize")
for doc_id in document_ids
)
result = batch.apply_async()
Métricas de Rendimiento
import time
from prometheus_client import Histogram, Counter, Gauge
# Métricas
REQUEST_LATENCY = Histogram(
"llm_request_duration_seconds",
"Latencia de requests al LLM",
["model", "provider"],
buckets=[0.1, 0.5, 1, 2, 5, 10, 30],
)
TOKENS_USED = Counter(
"llm_tokens_total",
"Tokens consumidos",
["model", "type"], # type: prompt|completion
)
ACTIVE_REQUESTS = Gauge(
"llm_active_requests",
"Requests activos al LLM",
["model"],
)
CACHE_HITS = Counter(
"llm_cache_hits_total",
"Hits de caché",
["result"], # hit|miss
)
async def tracked_completion(**kwargs):
model = kwargs["model"]
ACTIVE_REQUESTS.labels(model=model).inc()
start = time.time()
try:
response = await make_completion(**kwargs)
REQUEST_LATENCY.labels(model=model, provider="openai").observe(
time.time() - start
)
TOKENS_USED.labels(model=model, type="prompt").inc(
response.usage.prompt_tokens
)
TOKENS_USED.labels(model=model, type="completion").inc(
response.usage.completion_tokens
)
return response
finally:
ACTIVE_REQUESTS.labels(model=model).dec()
Resumen
Para escalar aplicaciones LLM: implementa streaming para mejor UX, usa batching para throughput, rate limiting para protección de costos, caché para eficiencia, load balancing entre proveedores para resiliencia, y colas para procesamiento asíncrono. Mide todo con métricas de latencia, throughput y tokens.