Inicio / Inteligencia Artificial / AI Engineering Pro / Monitoreo en Producción

Monitoreo en Producción

CloudWatch, Langfuse, evaluación continua, cost tracking y drift detection.

Avanzado
🔒 Solo lectura
📖

Estás en modo lectura

Puedes leer toda la lección, pero para marcar progreso, hacer ejercicios y ganar XP necesitas una cuenta Pro.

Desbloquear por $9/mes

Monitoreo en Producción: Calidad, Latencia y Costos

¿Por Qué Monitorear Aplicaciones de IA?

Las aplicaciones de IA fallan de formas que el software tradicional no: pueden generar respuestas incorrectas, alucinar datos, degradarse progresivamente sin errores visibles, o multiplicar costos inesperadamente. El monitoreo debe cubrir tres dimensiones:

        ┌─────────────────────────────────────────┐
        │        Monitoreo de IA en Producción     │
        ├─────────────┬──────────────┬─────────────┤
        │  CALIDAD    │  RENDIMIENTO │  COSTOS     │
        │             │              │             │
        │ Faithfulness│ Latencia p50 │ $/request   │
        │ Relevancy   │ Latencia p95 │ $/día       │
        │ Hallucination│ Throughput  │ Token usage  │
        │ User feedback│ Error rate  │ Cache hit %  │
        └─────────────┴──────────────┴─────────────┘

Dashboard de Monitoreo con CloudWatch

import boto3
from datetime import datetime

class AIMetricsPublisher:
    """Publica métricas de IA a CloudWatch."""

    def __init__(self, namespace: str = "AIProduction"):
        self.cw = boto3.client("cloudwatch")
        self.namespace = namespace

    def publish_request_metrics(
        self,
        endpoint: str,
        latency_ms: float,
        tokens_in: int,
        tokens_out: int,
        cost_usd: float,
        model: str,
        cache_hit: bool = False,
    ):
        dimensions = [
            {"Name": "Endpoint", "Value": endpoint},
            {"Name": "Model", "Value": model},
        ]

        self.cw.put_metric_data(
            Namespace=self.namespace,
            MetricData=[
                {
                    "MetricName": "Latency",
                    "Value": latency_ms,
                    "Unit": "Milliseconds",
                    "Dimensions": dimensions,
                },
                {
                    "MetricName": "InputTokens",
                    "Value": tokens_in,
                    "Unit": "Count",
                    "Dimensions": dimensions,
                },
                {
                    "MetricName": "OutputTokens",
                    "Value": tokens_out,
                    "Unit": "Count",
                    "Dimensions": dimensions,
                },
                {
                    "MetricName": "CostUSD",
                    "Value": cost_usd,
                    "Unit": "None",
                    "Dimensions": dimensions,
                },
                {
                    "MetricName": "CacheHit",
                    "Value": 1 if cache_hit else 0,
                    "Unit": "Count",
                    "Dimensions": dimensions,
                },
            ],
        )

    def publish_quality_metric(
        self,
        metric_name: str,
        value: float,
        endpoint: str,
    ):
        self.cw.put_metric_data(
            Namespace=self.namespace,
            MetricData=[{
                "MetricName": metric_name,
                "Value": value,
                "Unit": "None",
                "Dimensions": [
                    {"Name": "Endpoint", "Value": endpoint},
                ],
            }],
        )

Evaluación Continua en Producción

import asyncio
import random

class ProductionEvaluator:
    """Evalúa calidad de respuestas en producción (sampling)."""

    def __init__(
        self,
        llm_judge,
        metrics_publisher: AIMetricsPublisher,
        sample_rate: float = 0.05,  # Evaluar 5% de requests
    ):
        self.judge = llm_judge
        self.publisher = metrics_publisher
        self.sample_rate = sample_rate

    async def maybe_evaluate(
        self,
        query: str,
        response: str,
        contexts: list[str],
        endpoint: str,
    ):
        """Evaluar asíncronamente si cae en el sampling."""
        if random.random() > self.sample_rate:
            return

        # Ejecutar evaluación en background
        asyncio.create_task(
            self._evaluate_and_publish(
                query, response, contexts, endpoint
            )
        )

    async def _evaluate_and_publish(
        self,
        query: str,
        response: str,
        contexts: list[str],
        endpoint: str,
    ):
        # Evaluar faithfulness con LLM-as-judge
        faithfulness = await self.judge.evaluate_faithfulness(
            response=response,
            contexts=contexts,
        )

        relevancy = await self.judge.evaluate_relevancy(
            query=query,
            response=response,
        )

        # Publicar métricas
        self.publisher.publish_quality_metric(
            "Faithfulness", faithfulness, endpoint
        )
        self.publisher.publish_quality_metric(
            "Relevancy", relevancy, endpoint
        )

        # Alertar si calidad baja
        if faithfulness < 0.7:
            await self._send_alert(
                f"⚠️ Low faithfulness: {faithfulness:.2f} on {endpoint}"
            )

CloudWatch Alarms

def create_ai_alarms(cw_client, sns_topic_arn: str):
    """Crear alarmas para métricas críticas de IA."""

    alarms = [
        # ── Calidad ──────────────────────────────────
        {
            "AlarmName": "AI-LowFaithfulness",
            "MetricName": "Faithfulness",
            "Threshold": 0.80,
            "ComparisonOperator": "LessThanThreshold",
            "EvaluationPeriods": 3,
            "Period": 300,  # 5 minutos
            "Statistic": "Average",
            "TreatMissingData": "notBreaching",
        },
        # ── Latencia ─────────────────────────────────
        {
            "AlarmName": "AI-HighLatency-P95",
            "MetricName": "Latency",
            "Threshold": 10000,  # 10 segundos
            "ComparisonOperator": "GreaterThanThreshold",
            "EvaluationPeriods": 2,
            "Period": 300,
            "ExtendedStatistic": "p95",
        },
        # ── Costos ───────────────────────────────────
        {
            "AlarmName": "AI-HighCostPerDay",
            "MetricName": "CostUSD",
            "Threshold": 100,  # $100/día
            "ComparisonOperator": "GreaterThanThreshold",
            "EvaluationPeriods": 1,
            "Period": 86400,  # 24h
            "Statistic": "Sum",
        },
        # ── Error rate ───────────────────────────────
        {
            "AlarmName": "AI-HighErrorRate",
            "MetricName": "ErrorCount",
            "Threshold": 50,
            "ComparisonOperator": "GreaterThanThreshold",
            "EvaluationPeriods": 2,
            "Period": 300,
            "Statistic": "Sum",
        },
    ]

    for alarm_config in alarms:
        cw_client.put_metric_alarm(
            **alarm_config,
            Namespace="AIProduction",
            AlarmActions=[sns_topic_arn],
            Dimensions=[{"Name": "Endpoint", "Value": "rag-api"}],
        )

Langfuse: Observabilidad Especializada para LLMs

from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context

langfuse = Langfuse()

# @observe() — crea automáticamente un span/trace por cada
# invocación de la función. Al ser la función más externa,
# genera el trace raíz; funciones internas decoradas con
# @observe() crean child spans anidados automáticamente.
@observe()
async def rag_query(query: str) -> dict:
    """Pipeline RAG completamente trazado."""

    # 1. Embedding — traza automática
    # update_current_observation() agrega metadata al span
    # activo, permitiendo filtrar y buscar en el dashboard
    langfuse_context.update_current_observation(
        metadata={"step": "embedding"}
    )
    embedding = await get_embedding(query)

    # 2. Retrieval
    langfuse_context.update_current_observation(
        metadata={"step": "retrieval"}
    )
    contexts = await vector_search(embedding, top_k=5)

    # 3. Generation
    # Si generate_answer también tiene @observe(), Langfuse
    # crea un child span dentro de este trace automáticamente
    response = await generate_answer(query, contexts)

    # Registrar scores
    langfuse_context.score_current_trace(
        name="user_feedback",
        value=None,  # Se llena después con feedback del usuario
    )

    return {"answer": response, "contexts": contexts}

# flush() envía todos los eventos en buffer al servidor de
# Langfuse. Importante llamarlo antes de que el proceso
# termine, o se perderían trazas pendientes en el buffer.
# langfuse.flush()

# ── Feedback del usuario → Langfuse ─────────────────
async def record_feedback(trace_id: str, score: int, comment: str = ""):
    langfuse.score(
        trace_id=trace_id,
        name="user_feedback",
        value=score,        # 1-5
        comment=comment,
    )

Dashboard de Costos

class CostTracker:
    """Rastrear y proyectar costos de IA."""

    PRICING = {
        # Modelos por 1M tokens
        "gpt-4o": {"input": 2.50, "output": 10.00},
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
        "claude-sonnet-4-20250514": {"input": 3.00, "output": 15.00},
        "claude-haiku": {"input": 0.25, "output": 1.25},
        "amazon.titan-embed-text-v2": {"input": 0.02, "output": 0.0},
    }

    def calculate_cost(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int,
    ) -> float:
        pricing = self.PRICING.get(model, {"input": 0, "output": 0})
        cost = (
            (input_tokens / 1_000_000) * pricing["input"] +
            (output_tokens / 1_000_000) * pricing["output"]
        )
        return round(cost, 6)

    def daily_report(self, requests: list[dict]) -> dict:
        total_cost = sum(r["cost_usd"] for r in requests)
        total_requests = len(requests)

        by_model = {}
        for r in requests:
            model = r["model"]
            if model not in by_model:
                by_model[model] = {"cost": 0, "requests": 0, "tokens": 0}
            by_model[model]["cost"] += r["cost_usd"]
            by_model[model]["requests"] += 1
            by_model[model]["tokens"] += r["input_tokens"] + r["output_tokens"]

        return {
            "date": datetime.now().isoformat()[:10],
            "total_cost_usd": round(total_cost, 2),
            "total_requests": total_requests,
            "avg_cost_per_request": round(total_cost / max(total_requests, 1), 4),
            "by_model": by_model,
            "projected_monthly": round(total_cost * 30, 2),
        }

Drift Detection

class QualityDriftDetector:
    """Detectar degradación gradual en calidad.

    ¿Por qué monitorear drift? Los modelos se degradan por:
    - Cambios en la distribución de datos de los usuarios
    - Actualizaciones silenciosas del proveedor del modelo
    - Cambios en dependencias (prompts, retrieval, etc.)
    Sin detección, la calidad cae gradualmente sin alertas.
    """

    def __init__(self, window_size: int = 100):
        # Ventana deslizante: conserva las últimas N evaluaciones
        # para calcular un promedio móvil y comparar períodos
        self.window_size = window_size
        self.scores = []

    def add_score(self, metric: str, value: float):
        self.scores.append({"metric": metric, "value": value})
        # Mantener máximo 2x ventana: mitad "antigua" vs mitad "nueva"
        if len(self.scores) > self.window_size * 2:
            self.scores = self.scores[-self.window_size * 2:]

    def check_drift(self, metric: str) -> dict:
        values = [s["value"] for s in self.scores if s["metric"] == metric]
        if len(values) < self.window_size:
            return {"drift": False, "reason": "insufficient data"}

        # Dividir en dos mitades: período anterior vs período reciente
        midpoint = len(values) // 2
        old_avg = sum(values[:midpoint]) / midpoint
        new_avg = sum(values[midpoint:]) / (len(values) - midpoint)

        # Fórmula de detección: abs(current_avg - baseline_avg) / baseline
        # Calcula el cambio porcentual entre ambos períodos
        drift_pct = (new_avg - old_avg) / max(old_avg, 0.01) * 100

        return {
            # Umbral del 5%: si la calidad cambia más de 5% entre
            # períodos, se considera drift significativo. Un 5% de
            # degradación en faithfulness puede significar más
            # alucinaciones llegando a los usuarios.
            "drift": abs(drift_pct) > 5,  # >5% cambio
            "direction": "degrading" if drift_pct < 0 else "improving",
            "change_pct": round(drift_pct, 2),
            "old_avg": round(old_avg, 3),
            "new_avg": round(new_avg, 3),
        }

Resumen

Las 3 Dimensiones del Monitoreo

Dimensión Métricas Clave Herramientas
Calidad Faithfulness, relevancy, hallucination rate Langfuse, LLM-as-judge
Rendimiento Latencia p50/p95, throughput, error rate CloudWatch, Prometheus
Costos $/request, $/día, token usage, cache hit rate Custom tracker, AWS Cost Explorer

Checklist de Monitoreo

  • Métricas de calidad evaluadas por sampling (5-10% de requests)
  • Alarmas de latencia (p95 > umbral)
  • Alarmas de costos (diario y semanal)
  • Dashboard por modelo y endpoint
  • Drift detection para degradación gradual
  • Feedback de usuarios conectado a traces
  • Revisión semanal de métricas con el equipo

🧠 Preguntas de Repaso

1. ¿Cuáles son las 3 dimensiones del monitoreo de sistemas de IA en producción?

  • A) Frontend, Backend, Base de datos
  • B) Calidad (faithfulness, hallucination), Rendimiento (latencia p50/p95, throughput) y Costos ($/request, cache hit %)
  • C) Desarrollo, Staging, Producción
  • D) Input, Processing, Output

Respuesta: B) — Las 3 dimensiones son: Calidad (faithfulness, relevancy, hallucination rate, feedback), Rendimiento (latencia p50/p95, throughput, error rate) y Costos ($/request, $/día, token usage, cache hit %).

2. En un ProductionEvaluator con sample_rate=0.05, ¿qué porcentaje de requests se evalúa y por qué no se evalúa el 100%?

  • A) 50%, para tener significancia estadística
  • B) 5%, porque evaluar con LLM-as-judge cada request sería demasiado costoso y lento
  • C) 0.5%, para no afectar la latencia
  • D) 100%, siempre se debe evaluar todo

Respuesta: B) — Se evalúa solo el 5% de requests porque la evaluación con LLM-as-judge requiere una llamada adicional al LLM, lo que agregaría costo y latencia si se hiciera para cada request. El 5% es suficiente para detectar tendencias y problemas.

3. ¿Qué detecta un QualityDriftDetector y cuál es el umbral de alerta?

  • A) Detecta errores de red y alerta con >10% de pérdida de paquetes
  • B) Compara métricas entre períodos recientes y anteriores, alertando cuando el cambio excede 5% — indicando degradación o mejora gradual
  • C) Detecta cambios en el código fuente y alerta por commits sin review
  • D) Monitorea el uso de disco y alerta con >80% de ocupación

Respuesta: B) — El drift detector divide la ventana de datos en dos mitades (anterior vs reciente), calcula la fórmula (new_avg − old_avg) / old_avg × 100, y alerta si el cambio supera 5%. Reporta dirección: "degrading" o "improving".

4. Según la tabla de precios por 1M tokens, ¿cuánto más caro es claude-sonnet-4 que gpt-4o-mini en tokens de output?

  • A) 2x más caro
  • B) 10x más caro
  • C) 25x más caro ($15.00 vs $0.60 por 1M tokens de output)
  • D) El mismo precio

Respuesta: C) — Claude Sonnet 4 cuesta $15.00/1M tokens de output vs $0.60/1M de gpt-4o-mini, una diferencia de 25x. Esta diferencia justifica el model routing: usar el modelo costoso solo para queries complejos.

¿Te gustó esta lección?

Con Pro puedes marcar progreso, hacer ejercicios, tomar quizzes, ganar XP y obtener tu constancia.

Ver planes desde $9/mes