Inicio / Inteligencia Artificial / AI Engineering Pro / Python: APIs e Integraciones

Python: APIs e Integraciones

FastAPI para IA, multi-provider LLM client, async y structured output.

Avanzado
🔒 Solo lectura
📖

Estás en modo lectura

Puedes leer toda la lección, pero para marcar progreso, hacer ejercicios y ganar XP necesitas una cuenta Pro.

Desbloquear por $9/mes

Python para IA: APIs, SDKs e Integraciones

Python como Lenguaje Central de AI Engineering

Python no es solo para scripting — en AI Engineering es el lenguaje de integración que conecta modelos, bases de datos, APIs y servicios cloud.

Diseño de APIs para IA con FastAPI

from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import Optional
import asyncio

app = FastAPI(title="RAG API", version="2.0")

# ── Schemas ──────────────────────────────────────────────
class QueryRequest(BaseModel):
    question: str = Field(..., min_length=3, max_length=2000)
    collection: str = Field(default="default")
    max_results: int = Field(default=5, ge=1, le=20)
    stream: bool = Field(default=False)

class QueryResponse(BaseModel):
    answer: str
    sources: list[dict]
    tokens_used: int
    latency_ms: float
    model: str

class IndexRequest(BaseModel):
    content: str = Field(..., min_length=10)
    metadata: dict = Field(default_factory=dict)
    source: str

# ── Dependencias ─────────────────────────────────────────
async def get_rag_service():
    return RAGService()  # Singleton con connection pool

async def verify_api_key(x_api_key: str = Header(...)):
    if not is_valid_key(x_api_key):
        raise HTTPException(status_code=401, detail="Invalid API key")
    return x_api_key

# ── Endpoints ────────────────────────────────────────────
@app.post("/v1/query", response_model=QueryResponse)
async def query(
    request: QueryRequest,
    rag: RAGService = Depends(get_rag_service),
    api_key: str = Depends(verify_api_key),
):
    if request.stream:
        return StreamingResponse(
            rag.stream_answer(request.question, request.collection),
            media_type="text/event-stream",
        )

    result = await rag.answer(
        question=request.question,
        collection=request.collection,
        max_results=request.max_results,
    )
    return QueryResponse(**result)

@app.post("/v1/index")
async def index_document(
    request: IndexRequest,
    background_tasks: BackgroundTasks,
    rag: RAGService = Depends(get_rag_service),
):
    # Indexación en background para no bloquear
    background_tasks.add_task(rag.index, request.content, request.metadata)
    return {"status": "queued", "message": "Document will be indexed shortly"}

# ── Streaming con SSE ────────────────────────────────────
@app.post("/v1/stream")
async def stream_query(request: QueryRequest):
    async def event_generator():
        async for chunk in rag.stream_answer(request.question):
            yield f"data: {json.dumps({'text': chunk})}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(event_generator(), media_type="text/event-stream")

Middleware para Métricas

import time
from starlette.middleware.base import BaseHTTPMiddleware

class MetricsMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        start = time.perf_counter()
        response = await call_next(request)
        duration = (time.perf_counter() - start) * 1000

        # Publicar métricas
        if request.url.path.startswith("/v1/"):
            await publish_metric("api_latency_ms", duration, {
                "endpoint": request.url.path,
                "method": request.method,
                "status": response.status_code,
            })

        response.headers["X-Response-Time"] = f"{duration:.0f}ms"
        return response

app.add_middleware(MetricsMiddleware)

Integración con SDKs de LLM

Cliente Unificado Multi-Proveedor

from abc import ABC, abstractmethod
from dataclasses import dataclass

@dataclass
class LLMResponse:
    content: str
    model: str
    input_tokens: int
    output_tokens: int
    cost_usd: float
    latency_ms: float

class LLMClient(ABC):
    @abstractmethod
    async def generate(self, messages: list[dict], **kwargs) -> LLMResponse:
        pass

class OpenAIClient(LLMClient):
    def __init__(self, model: str = "gpt-4o"):
        from openai import AsyncOpenAI
        self.client = AsyncOpenAI()
        self.model = model

    async def generate(self, messages, **kwargs) -> LLMResponse:
        start = time.perf_counter()
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            **kwargs,
        )
        latency = (time.perf_counter() - start) * 1000

        usage = response.usage
        cost = self._calculate_cost(usage.prompt_tokens, usage.completion_tokens)

        return LLMResponse(
            content=response.choices[0].message.content,
            model=self.model,
            input_tokens=usage.prompt_tokens,
            output_tokens=usage.completion_tokens,
            cost_usd=cost,
            latency_ms=latency,
        )

class AnthropicClient(LLMClient):
    def __init__(self, model: str = "claude-sonnet-4-20250514"):
        from anthropic import AsyncAnthropic
        self.client = AsyncAnthropic()
        self.model = model

    async def generate(self, messages, **kwargs) -> LLMResponse:
        start = time.perf_counter()
        response = await self.client.messages.create(
            model=self.model,
            messages=messages,
            max_tokens=kwargs.get("max_tokens", 1024),
        )
        latency = (time.perf_counter() - start) * 1000

        return LLMResponse(
            content=response.content[0].text,
            model=self.model,
            input_tokens=response.usage.input_tokens,
            output_tokens=response.usage.output_tokens,
            cost_usd=self._calculate_cost(response.usage),
            latency_ms=latency,
        )

Patrones Async para IA

Procesamiento Concurrente de Documentos

import asyncio
from typing import AsyncIterator

async def process_documents_concurrent(
    documents: list[str],
    embed_fn,
    max_concurrent: int = 10,
) -> list[list[float]]:
    """Procesa documentos en paralelo con semáforo."""
    semaphore = asyncio.Semaphore(max_concurrent)

    async def process_one(doc: str) -> list[float]:
        async with semaphore:
            return await embed_fn(doc)

    tasks = [process_one(doc) for doc in documents]
    return await asyncio.gather(*tasks)

Producer-Consumer con asyncio.Queue

async def indexing_pipeline(documents: list, batch_size: int = 50):
    queue = asyncio.Queue(maxsize=100)

    async def producer():
        for doc in documents:
            chunks = split_document(doc)
            for chunk in chunks:
                await queue.put(chunk)
        await queue.put(None)  # Señal de fin

    async def consumer():
        batch = []
        while True:
            chunk = await queue.get()
            if chunk is None:
                break
            batch.append(chunk)
            if len(batch) >= batch_size:
                await index_batch(batch)
                batch = []
        if batch:
            await index_batch(batch)

    await asyncio.gather(producer(), consumer())

Rate Limiting y Retry

import asyncio
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type

class RateLimiter:
    def __init__(self, calls_per_minute: int):
        self.semaphore = asyncio.Semaphore(calls_per_minute)
        self.interval = 60.0 / calls_per_minute

    async def acquire(self):
        await self.semaphore.acquire()
        # Truco semáforo + call_later: en vez de liberar al completar la llamada,
        # libera después de `interval` segundos. Esto garantiza distribución uniforme
        # de requests en el tiempo (ej: 60 RPM → 1 release cada 1s),
        # sin importar cuánto dure cada llamada individual
        asyncio.get_event_loop().call_later(self.interval, self.semaphore.release)

# Retry con backoff exponencial
@retry(
    # wait_exponential: espera 2^n * multiplier segundos entre reintentos
    # min=1s, max=60s → intento 1: 1s, 2: 2s, 3: 4s, 4: 8s (nunca > 60s)
    wait=wait_exponential(multiplier=1, min=1, max=60),
    # stop_after_attempt(5): máximo 5 reintentos, luego propaga la excepción
    stop=stop_after_attempt(5),
    # retry_if_exception_type: solo reintentar errores transitorios (red/timeout)
    # NO reintentar AuthError, ValidationError, etc. (serían errores permanentes)
    retry=retry_if_exception_type((TimeoutError, ConnectionError)),
)
async def call_llm_with_retry(client: LLMClient, messages: list) -> LLMResponse:
    return await client.generate(messages)

Structured Output con Pydantic

from pydantic import BaseModel
from openai import OpenAI

class ExtractedEntity(BaseModel):
    name: str
    type: str  # person, company, product
    confidence: float

class AnalysisResult(BaseModel):
    summary: str
    entities: list[ExtractedEntity]
    sentiment: str  # positive, negative, neutral
    key_topics: list[str]

client = OpenAI()

response = client.beta.chat.completions.parse(
    model="gpt-4o",
    messages=[{"role": "user", "content": f"Analiza: {text}"}],
    response_format=AnalysisResult,
)

result: AnalysisResult = response.choices[0].message.parsed
print(result.summary)
print(result.entities[0].name)

Resumen

Python para AI Engineering requiere dominar:

  1. FastAPI — APIs async de alto rendimiento con streaming
  2. Clientes multi-proveedor — Abstracciones sobre OpenAI, Anthropic, etc.
  3. Async patterns — Concurrencia, batching, producer-consumer
  4. Rate limiting + retry — Resiliencia ante APIs externas
  5. Structured output — Pydantic para respuestas tipadas de LLMs

🧠 Preguntas de Repaso

1. En un RateLimiter que protege contra el rate limit de OpenAI (60 RPM), ¿por qué se libera el semáforo después de un intervalo fijo en vez de al completar la solicitud?

  • A) Porque las solicitudes siempre tardan el mismo tiempo
  • B) Para garantizar distribución uniforme: liberar después de N segundos (ej: 1s para 60 RPM) asegura que nunca se excedan 60 calls/minuto, independientemente de cuánto tarde cada call
  • C) Para reducir el uso de memoria
  • D) Por una limitación de asyncio en Python

Respuesta: B) — Si se liberara al completar, múltiples solicitudes rápidas podrían exceder el rate limit. Liberando después de un intervalo fijo (1s para 60RPM), se garantiza una distribución uniforme de solicitudes en el tiempo.

2. ¿Qué formato usa FastAPI para streaming de respuestas de LLM y cuál es la señal de finalización?

  • A) JSON con campo "complete: true"
  • B) SSE (Server-Sent Events) con formato data: {json}\n\n y señal de fin data: [DONE]\n\n
  • C) WebSocket con mensaje tipo "close"
  • D) HTTP chunked transfer con header "X-Complete"

Respuesta: B) — FastAPI usa StreamingResponse con media_type="text/event-stream" (SSE). Cada chunk se envía como data: {json}\n\n y la señal de finalización es data: [DONE]\n\n.

3. ¿Qué ventaja ofrece client.beta.chat.completions.parse() con un modelo Pydantic como response_format?

  • A) Es más rápido que la generación normal
  • B) Retorna directamente una instancia tipada del modelo Pydantic en response.choices[0].message.parsed, eliminando la necesidad de parsear JSON manualmente
  • C) Reduce el costo por token a la mitad
  • D) Solo funciona con modelos GPT-4o-mini

Respuesta: B) — Structured Output con Pydantic permite que la API retorne directamente una instancia tipada del modelo (ej: AnalysisResult) en .parsed, con campos validados y tipados, eliminando bugs de parsing manual de JSON.

4. En el patrón Producer-Consumer con asyncio.Queue, ¿cuál es la señal para indicar que el producer terminó?

  • A) Cerrar la queue con queue.close()
  • B) Enviar None a la queue con await queue.put(None)
  • C) Establecer una variable global finished = True
  • D) El consumer detecta automáticamente cuando no hay más items

Respuesta: B) — La convención estándar es enviar None como "poison pill" al final de la producción. El consumer verifica if item is None: break para saber que no hay más trabajo pendiente y debe terminar.

¿Te gustó esta lección?

Con Pro puedes marcar progreso, hacer ejercicios, tomar quizzes, ganar XP y obtener tu constancia.

Ver planes desde $9/mes