Inicio / Inteligencia Artificial / AI Engineering Pro / Eventos, Colas y Asincronía

Eventos, Colas y Asincronía

SQS, SNS, Celery, Step Functions y WebSockets para streaming.

Avanzado
🔒 Solo lectura
📖

Estás en modo lectura

Puedes leer toda la lección, pero para marcar progreso, hacer ejercicios y ganar XP necesitas una cuenta Pro.

Desbloquear por $9/mes

Patrones de Eventos, Colas y Procesamiento Asíncrono

¿Por Qué Procesamiento Async en IA?

Las aplicaciones de IA tienen operaciones lentas: generación de LLM (2-30s), indexación de documentos (minutos), entrenamiento (horas). El procesamiento asíncrono desacopla estas operaciones del request del usuario.

Síncrono (bloquea al usuario):
  Request ──► Index doc ──► Embed ──► Store ──► Response
  (Total: 15-60 segundos, usuario esperando)

Asíncrono (no bloquea):
  Request ──► Queue message ──► Response (200 OK, 100ms)
                    │
                    ▼ (en background)
              Index doc ──► Embed ──► Store ──► Notify

Amazon SQS para Colas de IA

import boto3
import json

sqs = boto3.client("sqs")
QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/xxx/ai-indexing-queue"

# ── Producer: encolar trabajo de indexación ──────────────
def enqueue_indexing(document_id: str, content: str, metadata: dict):
    sqs.send_message(
        QueueUrl=QUEUE_URL,
        MessageBody=json.dumps({
            "task": "index_document",
            "document_id": document_id,
            "content": content,
            "metadata": metadata,
        }),
        MessageAttributes={
            "Priority": {"DataType": "String", "StringValue": "normal"},
        },
    )

# ── Consumer: procesar desde la cola ─────────────────────
def process_queue():
    while True:
        response = sqs.receive_message(
            QueueUrl=QUEUE_URL,
            MaxNumberOfMessages=10,
            # Long polling (20s): mantiene conexión abierta esperando mensajes
            # vs Short polling (0s): responde inmediatamente aunque esté vacío
            # Long polling reduce costos (menos requests vacíos) y latencia
            WaitTimeSeconds=20,
            # VisibilityTimeout=300: el mensaje se vuelve invisible por 5 min
            # para otros consumers. DEBE ser > tiempo de procesamiento,
            # si no, otro consumer podría procesar el mismo mensaje en paralelo
            VisibilityTimeout=300,
        )

        for message in response.get("Messages", []):
            try:
                body = json.loads(message["Body"])
                process_task(body)

                # delete_message explícito es OBLIGATORIO: SQS usa "at-least-once delivery"
                # Si no eliminas el mensaje, vuelve a ser visible después del
                # VisibilityTimeout y será procesado de nuevo (duplicado)
                sqs.delete_message(
                    QueueUrl=QUEUE_URL,
                    ReceiptHandle=message["ReceiptHandle"],
                )
            except Exception as e:
                # En caso de error, NO eliminamos → el mensaje vuelve a la cola
                # automáticamente después del VisibilityTimeout (300s)
                # Esto da retry automático sin código adicional
                logger.error(f"Error processing: {e}")

Dead Letter Queue para Fallos

# Configurar DLQ para mensajes que fallan repetidamente
dlq_config = {
    "RedrivePolicy": json.dumps({
        "deadLetterTargetArn": "arn:aws:sqs:us-east-1:xxx:ai-indexing-dlq",
        "maxReceiveCount": 3,  # Después de 3 intentos → DLQ
    })
}

Event-Driven Architecture con SNS + SQS

                    ┌─────────────┐
                    │  SNS Topic  │
                    │ "doc-events"│
                    └──────┬──────┘
                           │
              ┌────────────┼────────────┐
              ▼            ▼            ▼
        ┌──────────┐ ┌──────────┐ ┌──────────┐
        │SQS Queue │ │SQS Queue │ │SQS Queue │
        │"indexing" │ │"embedding│ │"notify"  │
        └────┬─────┘ └────┬─────┘ └────┬─────┘
             ▼            ▼            ▼
        Lambda/ECS    Lambda/ECS    Lambda
        (Chunk+Index) (Embed+Store) (Email/Slack)
# Publicar evento cuando un documento es subido
sns = boto3.client("sns")

def on_document_uploaded(document_id: str, s3_key: str):
    sns.publish(
        TopicArn="arn:aws:sns:us-east-1:xxx:doc-events",
        Message=json.dumps({
            "event": "document.uploaded",
            "document_id": document_id,
            "s3_key": s3_key,
            "timestamp": datetime.utcnow().isoformat(),
        }),
        MessageAttributes={
            "event_type": {"DataType": "String", "StringValue": "document.uploaded"},
        },
    )

Celery para Tareas en Background (Self-Hosted)

from celery import Celery, chain, group

app = Celery("ai_tasks", broker="redis://localhost:6379/0")

# bind=True inyecta `self` como primer argumento, necesario para self.retry()
# Sin bind=True, no tendríamos acceso al objeto task para reintentar manualmente
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def chunk_document(self, document_id: str, content: str):
    try:
        chunks = text_splitter.split_text(content)
        return {"document_id": document_id, "chunks": chunks}
    except Exception as exc:
        self.retry(exc=exc)  # Reintenta hasta max_retries, luego propaga excepción

# rate_limit="100/m" protege contra rate limits de APIs externas (ej: OpenAI)
# Celery distribuye las ejecuciones para no exceder 100 llamadas por minuto
@app.task(bind=True, rate_limit="100/m")
def embed_chunks(self, data: dict):
    chunks = data["chunks"]
    embeddings = embedding_model.embed_documents(chunks)
    return {
        "document_id": data["document_id"],
        "chunks": chunks,
        "embeddings": embeddings,
    }

@app.task
def store_vectors(data: dict):
    vectorstore.add_texts(
        texts=data["chunks"],
        embeddings=data["embeddings"],
        metadatas=[{"document_id": data["document_id"]}] * len(data["chunks"]),
    )
    return {"status": "indexed", "chunks_count": len(data["chunks"])}

# ── Workflows ────────────────────────────────────────────

# Pipeline secuencial con chain():
# Ejecuta tareas en serie — el return de cada tarea se pasa como primer argumento a la siguiente
# .s() = signature: crea un objeto tarea sin ejecutarla (lazy), necesario para componer workflows
pipeline = chain(
    chunk_document.s(doc_id, content),  # .s() crea signature con args predefinidos
    embed_chunks.s(),    # Recibe return de chunk_document como primer arg
    store_vectors.s(),   # Recibe return de embed_chunks como primer arg
)
result = pipeline.apply_async()

# group() ejecuta tareas en PARALELO — cada chain corre independientemente
# Ideal para procesar múltiples documentos simultáneamente
batch = group(
    chain(chunk_document.s(d["id"], d["content"]), embed_chunks.s(), store_vectors.s())
    for d in documents
)
batch_result = batch.apply_async()

AWS Step Functions para Pipelines Complejos

{
  "Comment": "Document Processing Pipeline",
  "StartAt": "ValidateDocument",
  "States": {
    "ValidateDocument": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:...:validate-doc",
      "Next": "ChunkDocument",
      "Catch": [{"ErrorEquals": ["InvalidDocument"], "Next": "NotifyFailure"}]
    },
    "ChunkDocument": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:...:chunk-doc",
      "Next": "EmbedChunks"
    },
    "EmbedChunks": {
      "Type": "Map",
      "ItemsPath": "$.chunks",
      "MaxConcurrency": 10,
      "Iterator": {
        "StartAt": "EmbedSingle",
        "States": {
          "EmbedSingle": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:...:embed-chunk",
            "End": true
          }
        }
      },
      "Next": "StoreVectors"
    },
    "StoreVectors": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:...:store-vectors",
      "Next": "NotifySuccess"
    },
    "NotifySuccess": {
      "Type": "Task",
      "Resource": "arn:aws:sns:...:doc-events",
      "End": true
    },
    "NotifyFailure": {
      "Type": "Task",
      "Resource": "arn:aws:sns:...:error-alerts",
      "End": true
    }
  }
}

WebSockets para Streaming de Agentes

from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws/agent/{session_id}")
async def agent_websocket(websocket: WebSocket, session_id: str):
    await websocket.accept()

    try:
        while True:
            data = await websocket.receive_json()
            query = data.get("query", "")

            # Streaming de pasos del agente
            async for event in agent.astream_events(query):
                if event["type"] == "thinking":
                    await websocket.send_json({
                        "type": "thinking",
                        "content": event["content"],
                    })
                elif event["type"] == "tool_call":
                    await websocket.send_json({
                        "type": "tool_use",
                        "tool": event["tool"],
                        "input": event["input"],
                    })
                elif event["type"] == "token":
                    await websocket.send_json({
                        "type": "token",
                        "content": event["content"],
                    })

            await websocket.send_json({"type": "done"})

    except WebSocketDisconnect:
        logger.info(f"Client {session_id} disconnected")

Resumen

El procesamiento asíncrono en IA se basa en:

  1. SQS — Colas para desacoplar operaciones lentas
  2. SNS + SQS — Event-driven para pipelines de datos
  3. Celery — Framework de tareas con rate limiting y retries
  4. Step Functions — Orquestación visual de pipelines complejos
  5. WebSockets — Streaming en tiempo real para agentes

🧠 Preguntas de Repaso

1. En Amazon SQS, ¿por qué el VisibilityTimeout debe ser mayor al tiempo de procesamiento del mensaje?

  • A) Para que el mensaje se procese más rápido
  • B) Porque si expira antes de completar el procesamiento, el mensaje se vuelve visible de nuevo y otro consumer podría procesarlo en paralelo, causando duplicados
  • C) Para cumplir con los SLAs de AWS
  • D) Para reducir el costo por mensaje

Respuesta: B) — SQS usa "at-least-once delivery". Si el VisibilityTimeout (ej: 300s) expira antes de que el worker termine y haga delete_message, el mensaje reaparece en la cola y otro consumer lo procesaría nuevamente, causando procesamiento duplicado.

2. En Celery, ¿para qué sirve el parámetro rate_limit="100/m" en una tarea?

  • A) Limita el número total de tareas en la cola
  • B) Protege contra rate limits de APIs externas (como OpenAI), distribuyendo a máximo 100 llamadas por minuto
  • C) Limita el número de workers que pueden ejecutar la tarea
  • D) Controla la velocidad de serialización de los argumentos

Respuesta: B) — El rate_limit en Celery actúa como throttle para proteger APIs externas. Con "100/m", Celery distribuye la ejecución para nunca exceder 100 llamadas por minuto, evitando errores 429 (too many requests) de servicios como OpenAI.

3. ¿Cuál es la diferencia entre chain() y group() en Celery?

  • A) chain() es más rápido, group() es más preciso
  • B) chain() ejecuta tareas en pipeline secuencial (output de una = input de la siguiente), group() ejecuta tareas en paralelo
  • C) chain() es para tareas CPU, group() es para tareas I/O
  • D) No hay diferencia, son aliases

Respuesta: B)chain() crea un pipeline secuencial donde el return de cada tarea se pasa como primer argumento a la siguiente. group() ejecuta múltiples tareas en paralelo, ideal para procesar batch de documentos simultáneamente.

4. En WebSockets para streaming de agentes, ¿cuáles son los tipos de eventos que se envían al cliente?

  • A) start, middle, end
  • B) thinking, tool_call/tool_use, token, done
  • C) request, response, error
  • D) connect, message, disconnect

Respuesta: B) — Los eventos de streaming incluyen: thinking (razonamiento del agente), tool_call/tool_use (cuando usa una herramienta), token (texto generado progresivamente), y done (finalización) — permitiendo al frontend mostrar el proceso completo en tiempo real.

¿Te gustó esta lección?

Con Pro puedes marcar progreso, hacer ejercicios, tomar quizzes, ganar XP y obtener tu constancia.

Ver planes desde $9/mes