Как слой персистентности LangGraph обеспечивает долгосрочные диалоги между сессиями?
LangGraph сохраняет состояние графа после каждого шага через checkpointer (MemorySaver, SqliteSaver, AsyncPostgresSaver). При повторном вызове с тем же thread_id граф автоматически восстанавливает историю диалога из хранилища.
Слой персистентности LangGraph: долгосрочные диалоги
LangGraph реализует долгосрочную память через механизм checkpointer — объект, который сериализует граф-состояние после каждого шага и восстанавливает его при новом вызове. Контекст диалога привязывается к thread_id, передаваемому в config. Это позволяет возобновить любую ветку разговора спустя произвольное время.
Встроенные backends
- MemorySaver — хранит checkpoints в RAM; подходит только для разработки и тестов.
- SqliteSaver — персистентное хранение в SQLite; минимальная зависимость, подходит для небольших сервисов.
- AsyncSqliteSaver — асинхронная версия SQLiteSaver для async-графов.
- PostgresSaver / AsyncPostgresSaver — production-backend из пакета
langgraph-checkpoint-postgres; поддерживает конкурентный доступ и горизонтальное масштабирование.
Как работает checkpoint
Каждый checkpoint содержит:
channel_values— сериализованное состояние всех каналов графа.channel_versions— версии каналов (используются для incremental updates).metadata— step, source, write, parents.pending_sends— незавершённые команды Send (для fan-out паттернов).
Пример: PostgresSaver с долгим диалогом
import asyncio
from langgraph.graph import StateGraph, MessagesState
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
PG_URI = "postgresql://user:pass@localhost:5432/langgraph_db"
async def call_model(state: MessagesState):
llm = ChatOpenAI(model="gpt-4o")
response = await llm.ainvoke(state["messages"])
return {"messages": [response]}
async def main():
builder = StateGraph(MessagesState)
builder.add_node("chat", call_model)
builder.set_entry_point("chat")
builder.set_finish_point("chat")
async with AsyncPostgresSaver.from_conn_string(PG_URI) as checkpointer:
await checkpointer.setup() # создаёт таблицы при первом запуске
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "user-42-session"}}
# Первый вызов
result1 = await graph.ainvoke(
{"messages": [HumanMessage(content="Что такое pgvector?")]},
config=config
)
print(result1["messages"][-1].content)
# Второй вызов — граф автоматически загрузит предыдущее состояние
result2 = await graph.ainvoke(
{"messages": [HumanMessage(content="Покажи пример создания индекса.")]},
config=config
)
print(result2["messages"][-1].content)
asyncio.run(main())
Управление историей: trim и filter
Накопление сообщений ведёт к переполнению контекстного окна. LangGraph предоставляет утилиты для обрезки:
from langchain_core.messages import trim_messages
async def call_model_trimmed(state: MessagesState):
llm = ChatOpenAI(model="gpt-4o")
trimmed = trim_messages(
state["messages"],
max_tokens=4000,
strategy="last",
token_counter=llm,
include_system=True,
allow_partial=False,
)
response = await llm.ainvoke(trimmed)
return {"messages": [response]}
Cross-thread memory (долгосрочный профиль)
Для хранения данных между разными thread_id (например, профиль пользователя) используется Store API (LangGraph Platform) или кастомный узел, который пишет в отдельную таблицу. В InMemoryStore / AsyncInMemoryStore данные хранятся в namespace-ключах:
from langgraph.store.memory import InMemoryStore
from langgraph.graph import StateGraph
store = InMemoryStore()
async def remember_user(state, *, store):
namespace = ("user", state["user_id"])
existing = await store.aget(namespace, "profile")
profile = existing.value if existing else {}
profile["last_topic"] = state["messages"][-1].content[:50]
await store.aput(namespace, "profile", profile)
return {}
Подводные камни
- Пропуск
checkpointer.setup()— DDL-таблицы не создаются, первый checkpoint падает с ошибкой об отсутствующей таблице. - Бесконтрольный рост
messages— без trim_messages состояние экспоненциально растёт; токены заканчиваются, запрос к LLM падает. - MemorySaver в production — при перезапуске процесса все диалоги теряются; пользователи видят «потерю памяти» агента.
- Один thread_id для нескольких пользователей — история смешивается; всегда генерируйте уникальный
thread_idper user-session. - Большие бинарные объекты в state — изображения, PDF в base64 резко раздувают размер checkpoint и замедляют PostgreSQL.
- Конкурентные записи в один thread — два параллельных вызова с одним
thread_idмогут перезаписать друг друга; используйте блокировки или разные thread-id для параллельных веток. - Версионирование схемы состояния — после изменения TypedDict-полей старые checkpoints могут не десериализоваться; нужна явная миграция (см. отдельный вопрос по версионированию).
- Отсутствие TTL на checkpoints — без очистки устаревших записей таблица checkpoints неограниченно растёт; добавляйте cron-задачу или партиционирование по дате.
Common mistakes
- Объяснять
persistence long dialogsтолько синтаксисом без shape, dtype, состояния или режима выполнения. - Игнорировать leakage, воспроизводимость, пустые входы и скрытые копии данных.
- Не проверять production-симптомы: latency, память, ретраи, дрейф качества и несовпадение версий.
What the interviewer is testing
- Может ли связать
persistence long dialogsс реальным контрактом входов и выходов. - Упоминает ли тесты, метрики, reproducibility и диагностику ошибок.
- Видит ли различие между demo-кодом в ноутбуке и production-пайплайном.
- Предлагает ли observability, rollback, ограничения стоимости и стратегию incident replay.