Python para IA: APIs, SDKs e Integraciones
Python como Lenguaje Central de AI Engineering
Python no es solo para scripting — en AI Engineering es el lenguaje de integración que conecta modelos, bases de datos, APIs y servicios cloud.
Diseño de APIs para IA con FastAPI
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import Optional
import asyncio
app = FastAPI(title="RAG API", version="2.0")
# ── Schemas ──────────────────────────────────────────────
class QueryRequest(BaseModel):
question: str = Field(..., min_length=3, max_length=2000)
collection: str = Field(default="default")
max_results: int = Field(default=5, ge=1, le=20)
stream: bool = Field(default=False)
class QueryResponse(BaseModel):
answer: str
sources: list[dict]
tokens_used: int
latency_ms: float
model: str
class IndexRequest(BaseModel):
content: str = Field(..., min_length=10)
metadata: dict = Field(default_factory=dict)
source: str
# ── Dependencias ─────────────────────────────────────────
async def get_rag_service():
return RAGService() # Singleton con connection pool
async def verify_api_key(x_api_key: str = Header(...)):
if not is_valid_key(x_api_key):
raise HTTPException(status_code=401, detail="Invalid API key")
return x_api_key
# ── Endpoints ────────────────────────────────────────────
@app.post("/v1/query", response_model=QueryResponse)
async def query(
request: QueryRequest,
rag: RAGService = Depends(get_rag_service),
api_key: str = Depends(verify_api_key),
):
if request.stream:
return StreamingResponse(
rag.stream_answer(request.question, request.collection),
media_type="text/event-stream",
)
result = await rag.answer(
question=request.question,
collection=request.collection,
max_results=request.max_results,
)
return QueryResponse(**result)
@app.post("/v1/index")
async def index_document(
request: IndexRequest,
background_tasks: BackgroundTasks,
rag: RAGService = Depends(get_rag_service),
):
# Indexación en background para no bloquear
background_tasks.add_task(rag.index, request.content, request.metadata)
return {"status": "queued", "message": "Document will be indexed shortly"}
# ── Streaming con SSE ────────────────────────────────────
@app.post("/v1/stream")
async def stream_query(request: QueryRequest):
async def event_generator():
async for chunk in rag.stream_answer(request.question):
yield f"data: {json.dumps({'text': chunk})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
Middleware para Métricas
import time
from starlette.middleware.base import BaseHTTPMiddleware
class MetricsMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start = time.perf_counter()
response = await call_next(request)
duration = (time.perf_counter() - start) * 1000
# Publicar métricas
if request.url.path.startswith("/v1/"):
await publish_metric("api_latency_ms", duration, {
"endpoint": request.url.path,
"method": request.method,
"status": response.status_code,
})
response.headers["X-Response-Time"] = f"{duration:.0f}ms"
return response
app.add_middleware(MetricsMiddleware)
Integración con SDKs de LLM
Cliente Unificado Multi-Proveedor
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class LLMResponse:
content: str
model: str
input_tokens: int
output_tokens: int
cost_usd: float
latency_ms: float
class LLMClient(ABC):
@abstractmethod
async def generate(self, messages: list[dict], **kwargs) -> LLMResponse:
pass
class OpenAIClient(LLMClient):
def __init__(self, model: str = "gpt-4o"):
from openai import AsyncOpenAI
self.client = AsyncOpenAI()
self.model = model
async def generate(self, messages, **kwargs) -> LLMResponse:
start = time.perf_counter()
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
**kwargs,
)
latency = (time.perf_counter() - start) * 1000
usage = response.usage
cost = self._calculate_cost(usage.prompt_tokens, usage.completion_tokens)
return LLMResponse(
content=response.choices[0].message.content,
model=self.model,
input_tokens=usage.prompt_tokens,
output_tokens=usage.completion_tokens,
cost_usd=cost,
latency_ms=latency,
)
class AnthropicClient(LLMClient):
def __init__(self, model: str = "claude-sonnet-4-20250514"):
from anthropic import AsyncAnthropic
self.client = AsyncAnthropic()
self.model = model
async def generate(self, messages, **kwargs) -> LLMResponse:
start = time.perf_counter()
response = await self.client.messages.create(
model=self.model,
messages=messages,
max_tokens=kwargs.get("max_tokens", 1024),
)
latency = (time.perf_counter() - start) * 1000
return LLMResponse(
content=response.content[0].text,
model=self.model,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
cost_usd=self._calculate_cost(response.usage),
latency_ms=latency,
)
Patrones Async para IA
Procesamiento Concurrente de Documentos
import asyncio
from typing import AsyncIterator
async def process_documents_concurrent(
documents: list[str],
embed_fn,
max_concurrent: int = 10,
) -> list[list[float]]:
"""Procesa documentos en paralelo con semáforo."""
semaphore = asyncio.Semaphore(max_concurrent)
async def process_one(doc: str) -> list[float]:
async with semaphore:
return await embed_fn(doc)
tasks = [process_one(doc) for doc in documents]
return await asyncio.gather(*tasks)
Producer-Consumer con asyncio.Queue
async def indexing_pipeline(documents: list, batch_size: int = 50):
queue = asyncio.Queue(maxsize=100)
async def producer():
for doc in documents:
chunks = split_document(doc)
for chunk in chunks:
await queue.put(chunk)
await queue.put(None) # Señal de fin
async def consumer():
batch = []
while True:
chunk = await queue.get()
if chunk is None:
break
batch.append(chunk)
if len(batch) >= batch_size:
await index_batch(batch)
batch = []
if batch:
await index_batch(batch)
await asyncio.gather(producer(), consumer())
Rate Limiting y Retry
import asyncio
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type
class RateLimiter:
def __init__(self, calls_per_minute: int):
self.semaphore = asyncio.Semaphore(calls_per_minute)
self.interval = 60.0 / calls_per_minute
async def acquire(self):
await self.semaphore.acquire()
# Truco semáforo + call_later: en vez de liberar al completar la llamada,
# libera después de `interval` segundos. Esto garantiza distribución uniforme
# de requests en el tiempo (ej: 60 RPM → 1 release cada 1s),
# sin importar cuánto dure cada llamada individual
asyncio.get_event_loop().call_later(self.interval, self.semaphore.release)
# Retry con backoff exponencial
@retry(
# wait_exponential: espera 2^n * multiplier segundos entre reintentos
# min=1s, max=60s → intento 1: 1s, 2: 2s, 3: 4s, 4: 8s (nunca > 60s)
wait=wait_exponential(multiplier=1, min=1, max=60),
# stop_after_attempt(5): máximo 5 reintentos, luego propaga la excepción
stop=stop_after_attempt(5),
# retry_if_exception_type: solo reintentar errores transitorios (red/timeout)
# NO reintentar AuthError, ValidationError, etc. (serían errores permanentes)
retry=retry_if_exception_type((TimeoutError, ConnectionError)),
)
async def call_llm_with_retry(client: LLMClient, messages: list) -> LLMResponse:
return await client.generate(messages)
Structured Output con Pydantic
from pydantic import BaseModel
from openai import OpenAI
class ExtractedEntity(BaseModel):
name: str
type: str # person, company, product
confidence: float
class AnalysisResult(BaseModel):
summary: str
entities: list[ExtractedEntity]
sentiment: str # positive, negative, neutral
key_topics: list[str]
client = OpenAI()
response = client.beta.chat.completions.parse(
model="gpt-4o",
messages=[{"role": "user", "content": f"Analiza: {text}"}],
response_format=AnalysisResult,
)
result: AnalysisResult = response.choices[0].message.parsed
print(result.summary)
print(result.entities[0].name)
Resumen
Python para AI Engineering requiere dominar:
- FastAPI — APIs async de alto rendimiento con streaming
- Clientes multi-proveedor — Abstracciones sobre OpenAI, Anthropic, etc.
- Async patterns — Concurrencia, batching, producer-consumer
- Rate limiting + retry — Resiliencia ante APIs externas
- Structured output — Pydantic para respuestas tipadas de LLMs
🧠 Preguntas de Repaso
1. En un RateLimiter que protege contra el rate limit de OpenAI (60 RPM), ¿por qué se libera el semáforo después de un intervalo fijo en vez de al completar la solicitud?
- A) Porque las solicitudes siempre tardan el mismo tiempo
- B) Para garantizar distribución uniforme: liberar después de N segundos (ej: 1s para 60 RPM) asegura que nunca se excedan 60 calls/minuto, independientemente de cuánto tarde cada call
- C) Para reducir el uso de memoria
- D) Por una limitación de asyncio en Python
Respuesta: B) — Si se liberara al completar, múltiples solicitudes rápidas podrían exceder el rate limit. Liberando después de un intervalo fijo (1s para 60RPM), se garantiza una distribución uniforme de solicitudes en el tiempo.
2. ¿Qué formato usa FastAPI para streaming de respuestas de LLM y cuál es la señal de finalización?
- A) JSON con campo "complete: true"
- B) SSE (Server-Sent Events) con formato
data: {json}\n\ny señal de findata: [DONE]\n\n - C) WebSocket con mensaje tipo "close"
- D) HTTP chunked transfer con header "X-Complete"
Respuesta: B) — FastAPI usa
StreamingResponseconmedia_type="text/event-stream"(SSE). Cada chunk se envía comodata: {json}\n\ny la señal de finalización esdata: [DONE]\n\n.
3. ¿Qué ventaja ofrece client.beta.chat.completions.parse() con un modelo Pydantic como response_format?
- A) Es más rápido que la generación normal
- B) Retorna directamente una instancia tipada del modelo Pydantic en
response.choices[0].message.parsed, eliminando la necesidad de parsear JSON manualmente - C) Reduce el costo por token a la mitad
- D) Solo funciona con modelos GPT-4o-mini
Respuesta: B) — Structured Output con Pydantic permite que la API retorne directamente una instancia tipada del modelo (ej:
AnalysisResult) en.parsed, con campos validados y tipados, eliminando bugs de parsing manual de JSON.
4. En el patrón Producer-Consumer con asyncio.Queue, ¿cuál es la señal para indicar que el producer terminó?
- A) Cerrar la queue con
queue.close() - B) Enviar
Nonea la queue conawait queue.put(None) - C) Establecer una variable global
finished = True - D) El consumer detecta automáticamente cuando no hay más items
Respuesta: B) — La convención estándar es enviar
Nonecomo "poison pill" al final de la producción. El consumer verificaif item is None: breakpara saber que no hay más trabajo pendiente y debe terminar.