Inicio / Inteligencia Artificial / AI Engineering Pro / LangChain y Frameworks de Orquestación

LangChain y Frameworks de Orquestación

LCEL, LangGraph, streaming, tools y LangSmith observabilidad.

Avanzado
🔒 Solo lectura
📖

Estás en modo lectura

Puedes leer toda la lección, pero para marcar progreso, hacer ejercicios y ganar XP necesitas una cuenta Pro.

Desbloquear por $9/mes

LangChain: Chains, Tools y Agent Frameworks

LangChain en 2026

LangChain se ha consolidado como el framework estándar de orquestación para aplicaciones de IA. Su arquitectura modular con LangChain Expression Language (LCEL) permite componer cadenas complejas de forma declarativa.

LCEL: LangChain Expression Language

from langchain_core.runnables import RunnablePassthrough, RunnableLambda, RunnableParallel
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_openai import ChatOpenAI

# Chain básica con LCEL
prompt = ChatPromptTemplate.from_template(
    "Eres un experto en {topic}. Responde: {question}"
)
llm = ChatOpenAI(model="gpt-4o", temperature=0)

# El operador | (pipe) es el concepto central de LCEL.
# Conecta componentes en secuencia: la salida de uno es la entrada del siguiente.
# Flujo de datos: dict {topic, question} → prompt (formatea) → llm (genera AIMessage) → StrOutputParser (extrae string)
chain = prompt | llm | StrOutputParser()  # StrOutputParser() extrae el texto plano del AIMessage del LLM
result = chain.invoke({"topic": "RAG", "question": "¿Qué es HNSW?"})

Composición de Chains

# Ejecución paralela
parallel_chain = RunnableParallel(
    summary=summary_chain,
    keywords=keyword_chain,
    sentiment=sentiment_chain,
)

# Ejecución secuencial con transformaciones
sequential = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | rag_prompt
    | llm
    | StrOutputParser()
)

# Branching condicional
from langchain_core.runnables import RunnableBranch

branch = RunnableBranch(
    (lambda x: x["type"] == "technical", technical_chain),
    (lambda x: x["type"] == "business", business_chain),
    default_chain,  # Fallback
)

Streaming

# Streaming de respuestas token por token
async for chunk in chain.astream({"question": "Explica RAG"}):
    print(chunk, end="", flush=True)

# Streaming con eventos (para UI)
async for event in chain.astream_events(input, version="v2"):
    if event["event"] == "on_chat_model_stream":
        print(event["data"]["chunk"].content, end="")
    elif event["event"] == "on_retriever_end":
        print(f"\n[Recuperados {len(event['data']['output'])} documentos]")

LangGraph: Agentes como Grafos

LangGraph es la evolución de los agentes en LangChain — modela workflows como grafos de estado.

from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from typing import TypedDict, Annotated
import operator

# 1. Definir estado
class AgentState(TypedDict):
    # Annotated[list, operator.add] = patrón "reducer":
    # Cuando un nodo retorna {"messages": [nuevo_msg]}, en vez de REEMPLAZAR la lista,
    # operator.add la ACUMULA (append). Sin esto, cada nodo sobreescribiría el historial.
    messages: Annotated[list, operator.add]
    iteration: int  # Este campo NO tiene reducer, así que cada retorno lo reemplaza

# 2. Definir nodos
def agent_node(state: AgentState):
    """El LLM decide qué hacer."""
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response], "iteration": state["iteration"] + 1}

def should_continue(state: AgentState):
    """Decide si el agente debe continuar o terminar."""
    last_message = state["messages"][-1]
    # Lógica del edge condicional:
    # Si el LLM solicitó tool_calls → ir al nodo "tools" para ejecutarlas
    # Si no hay tool_calls → el LLM ya tiene la respuesta final, ir a END
    if last_message.tool_calls:
        return "tools"
    return "end"

# 3. Construir grafo
# StateGraph modela el workflow como un grafo dirigido donde:
# - Nodos = funciones que transforman el estado
# - Edges = transiciones entre nodos (pueden ser condicionales)
graph = StateGraph(AgentState)

graph.add_node("agent", agent_node)
graph.add_node("tools", ToolNode(tools=[search_tool, sql_tool, api_tool]))

graph.add_edge(START, "agent")
graph.add_conditional_edges("agent", should_continue, {
    "tools": "tools",
    "end": END,
})
graph.add_edge("tools", "agent")  # Después de tools, volver al agente

# 4. Compilar y ejecutar
# compile() "congela" la definición del grafo y genera un ejecutable optimizado.
# Valida que no haya nodos sin conectar, loops sin salida, etc.
app = graph.compile()

result = app.invoke({
    "messages": [{"role": "user", "content": "Busca las ventas de ayer y genera un reporte"}],
    "iteration": 0,
})

Checkpointing y Persistencia

from langgraph.checkpoint.sqlite import SqliteSaver

# Persistir estado para conversaciones multi-turno
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")

app = graph.compile(checkpointer=checkpointer)

# Cada invocación puede continuar desde el estado anterior
config = {"configurable": {"thread_id": "user-123"}}

result1 = app.invoke({"messages": [msg1]}, config=config)
# ... el usuario responde ...
result2 = app.invoke({"messages": [msg2]}, config=config)
# El agente recuerda el contexto previo

Human-in-the-Loop

from langgraph.graph import StateGraph

graph = StateGraph(AgentState)

# Interrumpir antes de ejecutar acciones peligrosas
app = graph.compile(
    checkpointer=checkpointer,
    interrupt_before=["execute_action"],  # Pausa aquí
)

# Ejecución
result = app.invoke(input, config)
# → Se pausa antes de "execute_action"

# El humano revisa y aprueba
app.update_state(config, {"approved": True})
result = app.invoke(None, config)  # Continuar desde el checkpoint

Herramientas Custom

from langchain_core.tools import tool
from pydantic import BaseModel, Field

class SearchInput(BaseModel):
    query: str = Field(description="Consulta de búsqueda")
    limit: int = Field(default=5, description="Máximo de resultados")

@tool(args_schema=SearchInput)
def search_knowledge_base(query: str, limit: int = 5) -> str:
    """Busca en la base de conocimiento interna de la empresa.
    Usar para preguntas sobre políticas, procesos o documentación técnica."""
    results = vectorstore.similarity_search(query, k=limit)
    return "\n---\n".join(doc.page_content for doc in results)

@tool
def get_current_metrics(metric_name: str) -> str:
    """Obtiene métricas actuales del sistema de monitoreo.
    Métricas disponibles: latency_p99, error_rate, throughput, cost_daily."""
    metrics = fetch_from_prometheus(metric_name)
    return json.dumps(metrics)

# Usar con LLM
llm_with_tools = ChatOpenAI(model="gpt-4o").bind_tools(
    [search_knowledge_base, get_current_metrics]
)

Cadenas de Producción: Patrones Comunes

RAG con Fallback

from langchain_core.runnables import RunnableWithFallbacks

# Si el retriever principal falla, usar fallback
primary_rag = primary_retriever | format_docs | rag_prompt | llm
fallback_rag = fallback_retriever | format_docs | rag_prompt | llm
direct_llm = direct_prompt | llm  # Sin RAG como último recurso

robust_chain = primary_rag.with_fallbacks([fallback_rag, direct_llm])

Routing de Queries

from langchain_core.prompts import ChatPromptTemplate

router_prompt = ChatPromptTemplate.from_template("""
Clasifica la siguiente pregunta en una de estas categorías:
- "technical": Preguntas técnicas sobre APIs, código, arquitectura
- "billing": Preguntas sobre facturación, pagos, suscripciones  
- "general": Preguntas generales

Pregunta: {question}
Categoría:
""")

router_chain = router_prompt | llm | StrOutputParser()

full_chain = {
    "type": router_chain,
    "question": RunnablePassthrough(),
} | RunnableBranch(
    (lambda x: "technical" in x["type"], technical_rag_chain),
    (lambda x: "billing" in x["type"], billing_rag_chain),
    general_chain,
)

LangSmith: Observabilidad

import os
# Las 3 variables de entorno necesarias para LangSmith:
os.environ["LANGCHAIN_TRACING_V2"] = "true"       # Activa el tracing ("true" para habilitar, "false" para desactivar)
os.environ["LANGCHAIN_API_KEY"] = "ls__..."         # API key de LangSmith (empieza con "ls__")
os.environ["LANGCHAIN_PROJECT"] = "production-assistant"  # Nombre del proyecto — agrupa los traces en el dashboard

# Con estas variables configuradas, TODAS las invocaciones de LangChain/LangGraph
# se registran automáticamente sin cambiar código. La integración captura:
# inputs, outputs, latencia, tokens consumidos, costos estimados y errores.

# Evaluación programática
from langsmith import Client

client = Client()

# Crear dataset de evaluación
dataset = client.create_dataset("rag-eval")
client.create_examples(
    inputs=[{"question": "¿Cómo configuro JWT?"}],
    outputs=[{"answer": "Edita config/auth.js..."}],
    dataset_id=dataset.id,
)

# Evaluar
from langsmith.evaluation import evaluate

# evaluate() ejecuta tu cadena contra CADA ejemplo del dataset,
# aplica los evaluators a cada par (predicción, referencia),
# y genera un reporte con scores agregados en el dashboard.
results = evaluate(
    lambda inputs: chain.invoke(inputs["question"]),  # Función a evaluar
    data="rag-eval",                                   # Dataset de referencia
    evaluators=[correctness_evaluator, faithfulness_evaluator],  # Evaluadores (pueden ser LLM-as-judge o heurísticos)
)

Resumen

LangChain proporciona una base sólida para aplicaciones de IA:

  1. LCEL — Composición declarativa de cadenas
  2. LangGraph — Agentes como grafos de estado con checkpointing
  3. Tools — Herramientas custom con validación Pydantic
  4. Streaming — Respuestas en tiempo real para UI
  5. LangSmith — Observabilidad y evaluación en producción

🧠 Preguntas de Repaso

1. En LCEL (LangChain Expression Language), ¿qué hace el operador | (pipe)?

  • A) Ejecuta las operaciones en paralelo
  • B) Encadena pasos secuencialmente: la salida de uno es la entrada del siguiente
  • C) Filtra resultados que no cumplen una condición
  • D) Combina múltiples modelos en un ensemble

Respuesta: B) — El operador pipe | en LCEL encadena componentes de forma declarativa: dict → prompt → llm → OutputParser. La salida de cada paso se convierte automáticamente en la entrada del siguiente.

2. En LangGraph, ¿para qué sirve el patrón Annotated[list, operator.add] en el estado?

  • A) Para limitar el tamaño máximo de la lista
  • B) Para que los mensajes se acumulen (reducer add) en vez de sobrescribirse
  • C) Para ordenar la lista alfabéticamente
  • D) Para validar el tipo de datos de la lista

Respuesta: B) — El patrón reducer con operator.add indica que cada nodo AGREGA sus resultados a la lista existente en vez de reemplazarla. Sin reducer, los campos del estado se sobrescriben completamente.

3. ¿Cuál es la función de interrupt_before en LangGraph?

  • A) Detener la ejecución si se excede un timeout
  • B) Implementar Human-in-the-Loop, pausando antes de nodos peligrosos para que un humano apruebe la acción
  • C) Interrumpir la ejecución si el costo excede un umbral
  • D) Cancelar nodos que tardan más de 30 segundos

Respuesta: B)interrupt_before=["execute_action"] pausa la ejecución antes de nodos que realizan acciones peligrosas (como modificar datos), permitiendo que un humano revise y apruebe antes de continuar.

4. ¿Qué captura automáticamente LangSmith cuando se configura LANGCHAIN_TRACING_V2="true"?

  • A) Solo los errores del sistema
  • B) Inputs, outputs, latencia, tokens consumidos, costos y errores de cada paso de la cadena
  • C) Solo las respuestas finales del LLM
  • D) Métricas de uso de CPU y memoria

Respuesta: B) — LangSmith captura automáticamente trazas completas de cada ejecución, incluyendo inputs/outputs de cada componente, latencia por paso, tokens consumidos, costos estimados y errores, proporcionando observabilidad completa del pipeline.

¿Te gustó esta lección?

Con Pro puedes marcar progreso, hacer ejercicios, tomar quizzes, ganar XP y obtener tu constancia.

Ver planes desde $9/mes