Patrones de Eventos, Colas y Procesamiento Asíncrono
¿Por Qué Procesamiento Async en IA?
Las aplicaciones de IA tienen operaciones lentas: generación de LLM (2-30s), indexación de documentos (minutos), entrenamiento (horas). El procesamiento asíncrono desacopla estas operaciones del request del usuario.
Síncrono (bloquea al usuario):
Request ──► Index doc ──► Embed ──► Store ──► Response
(Total: 15-60 segundos, usuario esperando)
Asíncrono (no bloquea):
Request ──► Queue message ──► Response (200 OK, 100ms)
│
▼ (en background)
Index doc ──► Embed ──► Store ──► Notify
Amazon SQS para Colas de IA
import boto3
import json
sqs = boto3.client("sqs")
QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/xxx/ai-indexing-queue"
# ── Producer: encolar trabajo de indexación ──────────────
def enqueue_indexing(document_id: str, content: str, metadata: dict):
sqs.send_message(
QueueUrl=QUEUE_URL,
MessageBody=json.dumps({
"task": "index_document",
"document_id": document_id,
"content": content,
"metadata": metadata,
}),
MessageAttributes={
"Priority": {"DataType": "String", "StringValue": "normal"},
},
)
# ── Consumer: procesar desde la cola ─────────────────────
def process_queue():
while True:
response = sqs.receive_message(
QueueUrl=QUEUE_URL,
MaxNumberOfMessages=10,
# Long polling (20s): mantiene conexión abierta esperando mensajes
# vs Short polling (0s): responde inmediatamente aunque esté vacío
# Long polling reduce costos (menos requests vacíos) y latencia
WaitTimeSeconds=20,
# VisibilityTimeout=300: el mensaje se vuelve invisible por 5 min
# para otros consumers. DEBE ser > tiempo de procesamiento,
# si no, otro consumer podría procesar el mismo mensaje en paralelo
VisibilityTimeout=300,
)
for message in response.get("Messages", []):
try:
body = json.loads(message["Body"])
process_task(body)
# delete_message explícito es OBLIGATORIO: SQS usa "at-least-once delivery"
# Si no eliminas el mensaje, vuelve a ser visible después del
# VisibilityTimeout y será procesado de nuevo (duplicado)
sqs.delete_message(
QueueUrl=QUEUE_URL,
ReceiptHandle=message["ReceiptHandle"],
)
except Exception as e:
# En caso de error, NO eliminamos → el mensaje vuelve a la cola
# automáticamente después del VisibilityTimeout (300s)
# Esto da retry automático sin código adicional
logger.error(f"Error processing: {e}")
Dead Letter Queue para Fallos
# Configurar DLQ para mensajes que fallan repetidamente
dlq_config = {
"RedrivePolicy": json.dumps({
"deadLetterTargetArn": "arn:aws:sqs:us-east-1:xxx:ai-indexing-dlq",
"maxReceiveCount": 3, # Después de 3 intentos → DLQ
})
}
Event-Driven Architecture con SNS + SQS
┌─────────────┐
│ SNS Topic │
│ "doc-events"│
└──────┬──────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│SQS Queue │ │SQS Queue │ │SQS Queue │
│"indexing" │ │"embedding│ │"notify" │
└────┬─────┘ └────┬─────┘ └────┬─────┘
▼ ▼ ▼
Lambda/ECS Lambda/ECS Lambda
(Chunk+Index) (Embed+Store) (Email/Slack)
# Publicar evento cuando un documento es subido
sns = boto3.client("sns")
def on_document_uploaded(document_id: str, s3_key: str):
sns.publish(
TopicArn="arn:aws:sns:us-east-1:xxx:doc-events",
Message=json.dumps({
"event": "document.uploaded",
"document_id": document_id,
"s3_key": s3_key,
"timestamp": datetime.utcnow().isoformat(),
}),
MessageAttributes={
"event_type": {"DataType": "String", "StringValue": "document.uploaded"},
},
)
Celery para Tareas en Background (Self-Hosted)
from celery import Celery, chain, group
app = Celery("ai_tasks", broker="redis://localhost:6379/0")
# bind=True inyecta `self` como primer argumento, necesario para self.retry()
# Sin bind=True, no tendríamos acceso al objeto task para reintentar manualmente
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def chunk_document(self, document_id: str, content: str):
try:
chunks = text_splitter.split_text(content)
return {"document_id": document_id, "chunks": chunks}
except Exception as exc:
self.retry(exc=exc) # Reintenta hasta max_retries, luego propaga excepción
# rate_limit="100/m" protege contra rate limits de APIs externas (ej: OpenAI)
# Celery distribuye las ejecuciones para no exceder 100 llamadas por minuto
@app.task(bind=True, rate_limit="100/m")
def embed_chunks(self, data: dict):
chunks = data["chunks"]
embeddings = embedding_model.embed_documents(chunks)
return {
"document_id": data["document_id"],
"chunks": chunks,
"embeddings": embeddings,
}
@app.task
def store_vectors(data: dict):
vectorstore.add_texts(
texts=data["chunks"],
embeddings=data["embeddings"],
metadatas=[{"document_id": data["document_id"]}] * len(data["chunks"]),
)
return {"status": "indexed", "chunks_count": len(data["chunks"])}
# ── Workflows ────────────────────────────────────────────
# Pipeline secuencial con chain():
# Ejecuta tareas en serie — el return de cada tarea se pasa como primer argumento a la siguiente
# .s() = signature: crea un objeto tarea sin ejecutarla (lazy), necesario para componer workflows
pipeline = chain(
chunk_document.s(doc_id, content), # .s() crea signature con args predefinidos
embed_chunks.s(), # Recibe return de chunk_document como primer arg
store_vectors.s(), # Recibe return de embed_chunks como primer arg
)
result = pipeline.apply_async()
# group() ejecuta tareas en PARALELO — cada chain corre independientemente
# Ideal para procesar múltiples documentos simultáneamente
batch = group(
chain(chunk_document.s(d["id"], d["content"]), embed_chunks.s(), store_vectors.s())
for d in documents
)
batch_result = batch.apply_async()
AWS Step Functions para Pipelines Complejos
{
"Comment": "Document Processing Pipeline",
"StartAt": "ValidateDocument",
"States": {
"ValidateDocument": {
"Type": "Task",
"Resource": "arn:aws:lambda:...:validate-doc",
"Next": "ChunkDocument",
"Catch": [{"ErrorEquals": ["InvalidDocument"], "Next": "NotifyFailure"}]
},
"ChunkDocument": {
"Type": "Task",
"Resource": "arn:aws:lambda:...:chunk-doc",
"Next": "EmbedChunks"
},
"EmbedChunks": {
"Type": "Map",
"ItemsPath": "$.chunks",
"MaxConcurrency": 10,
"Iterator": {
"StartAt": "EmbedSingle",
"States": {
"EmbedSingle": {
"Type": "Task",
"Resource": "arn:aws:lambda:...:embed-chunk",
"End": true
}
}
},
"Next": "StoreVectors"
},
"StoreVectors": {
"Type": "Task",
"Resource": "arn:aws:lambda:...:store-vectors",
"Next": "NotifySuccess"
},
"NotifySuccess": {
"Type": "Task",
"Resource": "arn:aws:sns:...:doc-events",
"End": true
},
"NotifyFailure": {
"Type": "Task",
"Resource": "arn:aws:sns:...:error-alerts",
"End": true
}
}
}
WebSockets para Streaming de Agentes
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws/agent/{session_id}")
async def agent_websocket(websocket: WebSocket, session_id: str):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
query = data.get("query", "")
# Streaming de pasos del agente
async for event in agent.astream_events(query):
if event["type"] == "thinking":
await websocket.send_json({
"type": "thinking",
"content": event["content"],
})
elif event["type"] == "tool_call":
await websocket.send_json({
"type": "tool_use",
"tool": event["tool"],
"input": event["input"],
})
elif event["type"] == "token":
await websocket.send_json({
"type": "token",
"content": event["content"],
})
await websocket.send_json({"type": "done"})
except WebSocketDisconnect:
logger.info(f"Client {session_id} disconnected")
Resumen
El procesamiento asíncrono en IA se basa en:
- SQS — Colas para desacoplar operaciones lentas
- SNS + SQS — Event-driven para pipelines de datos
- Celery — Framework de tareas con rate limiting y retries
- Step Functions — Orquestación visual de pipelines complejos
- WebSockets — Streaming en tiempo real para agentes
🧠 Preguntas de Repaso
1. En Amazon SQS, ¿por qué el VisibilityTimeout debe ser mayor al tiempo de procesamiento del mensaje?
- A) Para que el mensaje se procese más rápido
- B) Porque si expira antes de completar el procesamiento, el mensaje se vuelve visible de nuevo y otro consumer podría procesarlo en paralelo, causando duplicados
- C) Para cumplir con los SLAs de AWS
- D) Para reducir el costo por mensaje
Respuesta: B) — SQS usa "at-least-once delivery". Si el VisibilityTimeout (ej: 300s) expira antes de que el worker termine y haga
delete_message, el mensaje reaparece en la cola y otro consumer lo procesaría nuevamente, causando procesamiento duplicado.
2. En Celery, ¿para qué sirve el parámetro rate_limit="100/m" en una tarea?
- A) Limita el número total de tareas en la cola
- B) Protege contra rate limits de APIs externas (como OpenAI), distribuyendo a máximo 100 llamadas por minuto
- C) Limita el número de workers que pueden ejecutar la tarea
- D) Controla la velocidad de serialización de los argumentos
Respuesta: B) — El
rate_limiten Celery actúa como throttle para proteger APIs externas. Con "100/m", Celery distribuye la ejecución para nunca exceder 100 llamadas por minuto, evitando errores 429 (too many requests) de servicios como OpenAI.
3. ¿Cuál es la diferencia entre chain() y group() en Celery?
- A) chain() es más rápido, group() es más preciso
- B) chain() ejecuta tareas en pipeline secuencial (output de una = input de la siguiente), group() ejecuta tareas en paralelo
- C) chain() es para tareas CPU, group() es para tareas I/O
- D) No hay diferencia, son aliases
Respuesta: B) —
chain()crea un pipeline secuencial donde el return de cada tarea se pasa como primer argumento a la siguiente.group()ejecuta múltiples tareas en paralelo, ideal para procesar batch de documentos simultáneamente.
4. En WebSockets para streaming de agentes, ¿cuáles son los tipos de eventos que se envían al cliente?
- A) start, middle, end
- B) thinking, tool_call/tool_use, token, done
- C) request, response, error
- D) connect, message, disconnect
Respuesta: B) — Los eventos de streaming incluyen:
thinking(razonamiento del agente),tool_call/tool_use(cuando usa una herramienta),token(texto generado progresivamente), ydone(finalización) — permitiendo al frontend mostrar el proceso completo en tiempo real.