LangGraphMiddleCoding

Как реализовать паттерн map-reduce с помощью LangGraph для параллельной обработки нескольких документов?

Map-reduce в LangGraph реализуется через Send-API: map-узел генерирует список Send-объектов с индивидуальными payload-ами для каждого документа, параллельные узлы обрабатывают их независимо, а reduce-узел с Annotated-reducer собирает результаты.

Паттерн Map-Reduce в LangGraph

LangGraph реализует map-reduce через Send API: вместо статических рёбер узел-маппер возвращает список объектов Send(node_name, state), каждый из которых запускает отдельный экземпляр целевого узла параллельно. Reducer в TypedDict собирает результаты.

Полный пример: параллельный анализ документов

import operator
from typing import Annotated
from typing_extensions import TypedDict
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send

llm = ChatOpenAI(model="gpt-4o-mini")

# --- Схемы состояний ---

class OverallState(TypedDict):
    documents: list[str]          # входные документы
    summaries: Annotated[list[str], operator.add]  # reducer: накапливает результаты
    final_report: str

class DocumentState(TypedDict):
    """State для каждого параллельного map-узла."""
    document: str
    summaries: Annotated[list[str], operator.add]

# --- Узлы ---

def map_documents(state: OverallState) -> list[Send]:
    """Map: создаёт отдельный Send для каждого документа."""
    return [
        Send("summarize_doc", {"document": doc, "summaries": []})
        for doc in state["documents"]
    ]

def summarize_doc(state: DocumentState) -> dict:
    """Обрабатывает один документ — выполняется параллельно для каждого."""
    response = llm.invoke([
        HumanMessage(f"Кратко суммируй этот текст в 1-2 предложениях:\n\n{state['document']}")
    ])
    return {"summaries": [response.content]}

def reduce_summaries(state: OverallState) -> dict:
    """Reduce: объединяет все summaries в финальный отчёт."""
    combined = "\n".join(f"{i+1}. {s}" for i, s in enumerate(state["summaries"]))
    response = llm.invoke([
        HumanMessage(f"На основе этих краткий изложений напиши итоговый вывод:\n\n{combined}")
    ])
    return {"final_report": response.content}

# --- Сборка графа ---

builder = StateGraph(OverallState)
builder.add_node("summarize_doc", summarize_doc)
builder.add_node("reduce_summaries", reduce_summaries)

# START -> map_documents (функция-маппер возвращает список Send)
builder.add_conditional_edges(START, map_documents, ["summarize_doc"])
# После всех параллельных summarize_doc -> reduce
builder.add_edge("summarize_doc", "reduce_summaries")
builder.add_edge("reduce_summaries", END)

graph = builder.compile()

# --- Запуск ---

docs = [
    "Python — интерпретируемый язык программирования с динамической типизацией.",
    "FastAPI использует аннотации типов Python для автоматической генерации OpenAPI.",
    "PostgreSQL поддерживает расширения, включая pgvector для векторного поиска.",
]

result = graph.invoke({"documents": docs, "summaries": [], "final_report": ""})
print(result["final_report"])

Как работает параллелизм

Когда add_conditional_edges получает список Send объектов, LangGraph запускает все целевые узлы конкурентно (через asyncio или ThreadPoolExecutor в sync-режиме). Reducer Annotated[list[str], operator.add] гарантирует, что результаты всех параллельных веток объединяются в один список без гонок.

Вариант с async для высокой нагрузки

import asyncio

async def summarize_doc_async(state: DocumentState) -> dict:
    response = await llm.ainvoke([
        HumanMessage(f"Суммируй: {state['document']}")
    ])
    return {"summaries": [response.content]}

# Использовать ainvoke для параллельного выполнения
result = await graph.ainvoke({"documents": docs, "summaries": [], "final_report": ""})

Подводные камни

  • Без Annotated[list, operator.add] каждый параллельный узел перезапишет summaries вместо того чтобы добавить — получите только последний результат.
  • Порядок результатов в summaries недетерминирован при параллельном выполнении; если порядок важен, добавляйте индекс в payload и сортируйте в reduce-узле.
  • Send создаёт отдельный экземпляр State для каждой ветки — изменения в DocumentState не видны другим параллельным веткам (это хорошо, но неочевидно).
  • При большом числе документов (>100) все LLM-запросы стартуют одновременно — это может превысить rate limit OpenAI; добавьте семафор или батчинг.
  • Ошибка в одном map-узле приводит к отмене всего графа; оборачивайте summarize_doc в try/except и возвращайте fallback-значение.
  • recursion_limit считает суммарное число шагов, включая параллельные; для 50 документов установите лимит явно: graph.invoke(..., config={"recursion_limit": 200}).
  • Checkpoint-ы для Send-ветвлений хранят State каждой ветки отдельно — при большом числе документов это значительно увеличивает объём в checkpointer.
  • В sync-режиме параллелизм реализован через threads, что ограничено GIL для CPU-bound задач; для IO-bound (LLM API) это не проблема, но для CPU-heavy обработки используйте multiprocessing вне графа.

Common mistakes

  • Объяснять map reduce только синтаксисом без shape, dtype, состояния или режима выполнения.
  • Игнорировать leakage, воспроизводимость, пустые входы и скрытые копии данных.
  • Не проверять production-симптомы: latency, память, ретраи, дрейф качества и несовпадение версий.

What the interviewer is testing

  • Может ли связать map reduce с реальным контрактом входов и выходов.
  • Упоминает ли тесты, метрики, reproducibility и диагностику ошибок.
  • Видит ли различие между demo-кодом в ноутбуке и production-пайплайном.

Sources

Related topics

Как реализовать паттерн map-reduce с помощью LangGraph для параллельной обработки нескольких документов? | Talanto