LangGraphSeniorSystem design

Как слой персистентности LangGraph обеспечивает долгосрочные диалоги между сессиями?

LangGraph сохраняет состояние графа после каждого шага через checkpointer (MemorySaver, SqliteSaver, AsyncPostgresSaver). При повторном вызове с тем же thread_id граф автоматически восстанавливает историю диалога из хранилища.

Слой персистентности LangGraph: долгосрочные диалоги

LangGraph реализует долгосрочную память через механизм checkpointer — объект, который сериализует граф-состояние после каждого шага и восстанавливает его при новом вызове. Контекст диалога привязывается к thread_id, передаваемому в config. Это позволяет возобновить любую ветку разговора спустя произвольное время.

Встроенные backends

  • MemorySaver — хранит checkpoints в RAM; подходит только для разработки и тестов.
  • SqliteSaver — персистентное хранение в SQLite; минимальная зависимость, подходит для небольших сервисов.
  • AsyncSqliteSaver — асинхронная версия SQLiteSaver для async-графов.
  • PostgresSaver / AsyncPostgresSaver — production-backend из пакета langgraph-checkpoint-postgres; поддерживает конкурентный доступ и горизонтальное масштабирование.

Как работает checkpoint

Каждый checkpoint содержит:

  • channel_values — сериализованное состояние всех каналов графа.
  • channel_versions — версии каналов (используются для incremental updates).
  • metadata — step, source, write, parents.
  • pending_sends — незавершённые команды Send (для fan-out паттернов).

Пример: PostgresSaver с долгим диалогом

import asyncio
from langgraph.graph import StateGraph, MessagesState
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

PG_URI = "postgresql://user:pass@localhost:5432/langgraph_db"

async def call_model(state: MessagesState):
    llm = ChatOpenAI(model="gpt-4o")
    response = await llm.ainvoke(state["messages"])
    return {"messages": [response]}

async def main():
    builder = StateGraph(MessagesState)
    builder.add_node("chat", call_model)
    builder.set_entry_point("chat")
    builder.set_finish_point("chat")

    async with AsyncPostgresSaver.from_conn_string(PG_URI) as checkpointer:
        await checkpointer.setup()  # создаёт таблицы при первом запуске
        graph = builder.compile(checkpointer=checkpointer)

        config = {"configurable": {"thread_id": "user-42-session"}}

        # Первый вызов
        result1 = await graph.ainvoke(
            {"messages": [HumanMessage(content="Что такое pgvector?")]},
            config=config
        )
        print(result1["messages"][-1].content)

        # Второй вызов — граф автоматически загрузит предыдущее состояние
        result2 = await graph.ainvoke(
            {"messages": [HumanMessage(content="Покажи пример создания индекса.")]},
            config=config
        )
        print(result2["messages"][-1].content)

asyncio.run(main())

Управление историей: trim и filter

Накопление сообщений ведёт к переполнению контекстного окна. LangGraph предоставляет утилиты для обрезки:

from langchain_core.messages import trim_messages

async def call_model_trimmed(state: MessagesState):
    llm = ChatOpenAI(model="gpt-4o")
    trimmed = trim_messages(
        state["messages"],
        max_tokens=4000,
        strategy="last",
        token_counter=llm,
        include_system=True,
        allow_partial=False,
    )
    response = await llm.ainvoke(trimmed)
    return {"messages": [response]}

Cross-thread memory (долгосрочный профиль)

Для хранения данных между разными thread_id (например, профиль пользователя) используется Store API (LangGraph Platform) или кастомный узел, который пишет в отдельную таблицу. В InMemoryStore / AsyncInMemoryStore данные хранятся в namespace-ключах:

from langgraph.store.memory import InMemoryStore
from langgraph.graph import StateGraph

store = InMemoryStore()

async def remember_user(state, *, store):
    namespace = ("user", state["user_id"])
    existing = await store.aget(namespace, "profile")
    profile = existing.value if existing else {}
    profile["last_topic"] = state["messages"][-1].content[:50]
    await store.aput(namespace, "profile", profile)
    return {}

Подводные камни

  • Пропуск checkpointer.setup() — DDL-таблицы не создаются, первый checkpoint падает с ошибкой об отсутствующей таблице.
  • Бесконтрольный рост messages — без trim_messages состояние экспоненциально растёт; токены заканчиваются, запрос к LLM падает.
  • MemorySaver в production — при перезапуске процесса все диалоги теряются; пользователи видят «потерю памяти» агента.
  • Один thread_id для нескольких пользователей — история смешивается; всегда генерируйте уникальный thread_id per user-session.
  • Большие бинарные объекты в state — изображения, PDF в base64 резко раздувают размер checkpoint и замедляют PostgreSQL.
  • Конкурентные записи в один thread — два параллельных вызова с одним thread_id могут перезаписать друг друга; используйте блокировки или разные thread-id для параллельных веток.
  • Версионирование схемы состояния — после изменения TypedDict-полей старые checkpoints могут не десериализоваться; нужна явная миграция (см. отдельный вопрос по версионированию).
  • Отсутствие TTL на checkpoints — без очистки устаревших записей таблица checkpoints неограниченно растёт; добавляйте cron-задачу или партиционирование по дате.

Common mistakes

  • Объяснять persistence long dialogs только синтаксисом без shape, dtype, состояния или режима выполнения.
  • Игнорировать leakage, воспроизводимость, пустые входы и скрытые копии данных.
  • Не проверять production-симптомы: latency, память, ретраи, дрейф качества и несовпадение версий.

What the interviewer is testing

  • Может ли связать persistence long dialogs с реальным контрактом входов и выходов.
  • Упоминает ли тесты, метрики, reproducibility и диагностику ошибок.
  • Видит ли различие между demo-кодом в ноутбуке и production-пайплайном.
  • Предлагает ли observability, rollback, ограничения стоимости и стратегию incident replay.

Sources

Related topics

Как слой персистентности LangGraph обеспечивает долгосрочные диалоги между сессиями? | Talanto