LangGraphMiddleCoding

Что такое функция-редуктор add_messages и как она обрабатывает обновления состояния для списков сообщений?

add_messages — это редуктор LangGraph для поля состояния типа list[BaseMessage]. При обновлении он дописывает новые сообщения в конец, а не заменяет весь список. Дублирование по id предотвращается автоматически.

Что такое add_messages

add_messages — встроенный редуктор (reducer) LangGraph, предназначенный для поля состояния, хранящего список сообщений (list[BaseMessage]). Вместо того чтобы перезаписывать список целиком, он добавляет новые сообщения в конец уже накопленного состояния. Если пришло сообщение с уже существующим id, редуктор заменяет старый элемент — это обеспечивает идемпотентность при повторных запусках.

Как подключить

Редуктор указывается через аннотацию Annotated прямо в TypedDict-схеме состояния:

from typing import Annotated
from typing_extensions import TypedDict
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

class State(TypedDict):
    # add_messages будет вызываться при каждом обновлении этого поля
    messages: Annotated[list[BaseMessage], add_messages]

def chat_node(state: State) -> dict:
    last = state["messages"][-1]
    reply = AIMessage(content=f"Вы написали: {last.content}")
    # Возвращаем только новое сообщение — редуктор сам допишет его в список
    return {"messages": [reply]}

builder = StateGraph(State)
builder.add_node("chat", chat_node)
builder.add_edge(START, "chat")
builder.add_edge("chat", END)
graph = builder.compile()

# Запуск
result = graph.invoke({"messages": [HumanMessage(content="Привет!")]})
for msg in result["messages"]:
    print(type(msg).__name__, ":", msg.content)
# HumanMessage : Привет!
# AIMessage    : Вы написали: Привет!

Механика слияния

Внутри add_messages работает следующий алгоритм:

  1. Берётся текущий список сообщений из состояния.
  2. Для каждого нового сообщения проверяется его id.
  3. Если id уже есть — старое сообщение заменяется (upsert).
  4. Если id новый — сообщение добавляется в конец.

Это означает, что узел может исправить предыдущее сообщение, вернув объект с тем же id, но изменённым содержимым — удобно для streaming-обновлений.

Использование с MessagesState

LangGraph предоставляет готовый базовый класс MessagesState, в котором поле messages уже аннотировано редуктором:

from langgraph.graph import MessagesState

# Эквивалентно TypedDict с Annotated[list[BaseMessage], add_messages]
class MyState(MessagesState):
    summary: str  # можно добавлять свои поля

Подводные камни

  • Без аннотации — полная перезапись. Если написать просто messages: list[BaseMessage] без Annotated[..., add_messages], каждое обновление узла будет заменять весь список, а не дополнять его.
  • Возврат dict, а не полного состояния. Узел должен возвращать только изменившиеся поля ({"messages": [new_msg]}). Если вернуть state целиком с уже обновлённым полем, редуктор применится повторно и сообщения продублируются.
  • Коллизии id при ручном создании сообщений. LangChain автогенерирует уникальные id, но если создавать сообщения с явным id, легко случайно передать одинаковый id — и старое сообщение будет молча перезаписано.
  • Неожиданный рост истории. В долгих агентских петлях список сообщений растёт бесконечно. Необходимо реализовать обрезку (trim) или суммаризацию истории, иначе контекстное окно LLM переполнится.
  • Тип элемента имеет значение. Редуктор принимает как BaseMessage, так и dict-представления сообщений, однако смешивание типов может вызвать ошибки сериализации при использовании checkpointer.
  • Параллельные узлы и порядок слияния. В fan-out / fan-in топологиях несколько узлов могут одновременно возвращать сообщения. Порядок их добавления в список определяется порядком завершения узлов, что может быть недетерминированным.
  • Несовместимость с простыми типами. add_messages рассчитан только на list[BaseMessage]. Попытка применить его к list[str] или другим типам вызовет ошибку или непредсказуемое поведение.
  • Checkpointer сериализует весь список. При использовании MemorySaver или SqliteSaver в checkpoint сохраняется полная история. Большой список существенно увеличивает размер snapshot и замедляет персистентность.

Common mistakes

  • Объяснять add messages reducer только синтаксисом без shape, dtype, состояния или режима выполнения.
  • Игнорировать leakage, воспроизводимость, пустые входы и скрытые копии данных.
  • Не проверять production-симптомы: latency, память, ретраи, дрейф качества и несовпадение версий.

What the interviewer is testing

  • Может ли связать add messages reducer с реальным контрактом входов и выходов.
  • Упоминает ли тесты, метрики, reproducibility и диагностику ошибок.
  • Видит ли различие между demo-кодом в ноутбуке и production-пайплайном.

Sources

Related topics