Как реализовать паттерн map-reduce с помощью LangGraph для параллельной обработки нескольких документов?
Map-reduce в LangGraph реализуется через Send-API: map-узел генерирует список Send-объектов с индивидуальными payload-ами для каждого документа, параллельные узлы обрабатывают их независимо, а reduce-узел с Annotated-reducer собирает результаты.
Паттерн Map-Reduce в LangGraph
LangGraph реализует map-reduce через Send API: вместо статических рёбер узел-маппер возвращает список объектов Send(node_name, state), каждый из которых запускает отдельный экземпляр целевого узла параллельно. Reducer в TypedDict собирает результаты.
Полный пример: параллельный анализ документов
import operator
from typing import Annotated
from typing_extensions import TypedDict
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
llm = ChatOpenAI(model="gpt-4o-mini")
# --- Схемы состояний ---
class OverallState(TypedDict):
documents: list[str] # входные документы
summaries: Annotated[list[str], operator.add] # reducer: накапливает результаты
final_report: str
class DocumentState(TypedDict):
"""State для каждого параллельного map-узла."""
document: str
summaries: Annotated[list[str], operator.add]
# --- Узлы ---
def map_documents(state: OverallState) -> list[Send]:
"""Map: создаёт отдельный Send для каждого документа."""
return [
Send("summarize_doc", {"document": doc, "summaries": []})
for doc in state["documents"]
]
def summarize_doc(state: DocumentState) -> dict:
"""Обрабатывает один документ — выполняется параллельно для каждого."""
response = llm.invoke([
HumanMessage(f"Кратко суммируй этот текст в 1-2 предложениях:\n\n{state['document']}")
])
return {"summaries": [response.content]}
def reduce_summaries(state: OverallState) -> dict:
"""Reduce: объединяет все summaries в финальный отчёт."""
combined = "\n".join(f"{i+1}. {s}" for i, s in enumerate(state["summaries"]))
response = llm.invoke([
HumanMessage(f"На основе этих краткий изложений напиши итоговый вывод:\n\n{combined}")
])
return {"final_report": response.content}
# --- Сборка графа ---
builder = StateGraph(OverallState)
builder.add_node("summarize_doc", summarize_doc)
builder.add_node("reduce_summaries", reduce_summaries)
# START -> map_documents (функция-маппер возвращает список Send)
builder.add_conditional_edges(START, map_documents, ["summarize_doc"])
# После всех параллельных summarize_doc -> reduce
builder.add_edge("summarize_doc", "reduce_summaries")
builder.add_edge("reduce_summaries", END)
graph = builder.compile()
# --- Запуск ---
docs = [
"Python — интерпретируемый язык программирования с динамической типизацией.",
"FastAPI использует аннотации типов Python для автоматической генерации OpenAPI.",
"PostgreSQL поддерживает расширения, включая pgvector для векторного поиска.",
]
result = graph.invoke({"documents": docs, "summaries": [], "final_report": ""})
print(result["final_report"])
Как работает параллелизм
Когда add_conditional_edges получает список Send объектов, LangGraph запускает все целевые узлы конкурентно (через asyncio или ThreadPoolExecutor в sync-режиме). Reducer Annotated[list[str], operator.add] гарантирует, что результаты всех параллельных веток объединяются в один список без гонок.
Вариант с async для высокой нагрузки
import asyncio
async def summarize_doc_async(state: DocumentState) -> dict:
response = await llm.ainvoke([
HumanMessage(f"Суммируй: {state['document']}")
])
return {"summaries": [response.content]}
# Использовать ainvoke для параллельного выполнения
result = await graph.ainvoke({"documents": docs, "summaries": [], "final_report": ""})
Подводные камни
- Без
Annotated[list, operator.add]каждый параллельный узел перезапишетsummariesвместо того чтобы добавить — получите только последний результат. - Порядок результатов в
summariesнедетерминирован при параллельном выполнении; если порядок важен, добавляйте индекс в payload и сортируйте в reduce-узле. Sendсоздаёт отдельный экземпляр State для каждой ветки — изменения вDocumentStateне видны другим параллельным веткам (это хорошо, но неочевидно).- При большом числе документов (>100) все LLM-запросы стартуют одновременно — это может превысить rate limit OpenAI; добавьте семафор или батчинг.
- Ошибка в одном map-узле приводит к отмене всего графа; оборачивайте
summarize_docв try/except и возвращайте fallback-значение. recursion_limitсчитает суммарное число шагов, включая параллельные; для 50 документов установите лимит явно:graph.invoke(..., config={"recursion_limit": 200}).- Checkpoint-ы для Send-ветвлений хранят State каждой ветки отдельно — при большом числе документов это значительно увеличивает объём в checkpointer.
- В sync-режиме параллелизм реализован через threads, что ограничено GIL для CPU-bound задач; для IO-bound (LLM API) это не проблема, но для CPU-heavy обработки используйте
multiprocessingвне графа.
Common mistakes
- Объяснять
map reduceтолько синтаксисом без shape, dtype, состояния или режима выполнения. - Игнорировать leakage, воспроизводимость, пустые входы и скрытые копии данных.
- Не проверять production-симптомы: latency, память, ретраи, дрейф качества и несовпадение версий.
What the interviewer is testing
- Может ли связать
map reduceс реальным контрактом входов и выходов. - Упоминает ли тесты, метрики, reproducibility и диагностику ошибок.
- Видит ли различие между demo-кодом в ноутбуке и production-пайплайном.