LangGraphMiddleCoding
Как реализовать стриминг в LangGraph для отображения промежуточных шагов агента пользователю?
LangGraph поддерживает стриминг через graph.stream() / graph.astream() с режимами values, updates, messages и events; для промежуточных шагов агента используют режим messages или astream_events.
Стриминг в LangGraph
LangGraph предоставляет несколько режимов стриминга, которые выбираются параметром stream_mode. Это позволяет показывать пользователю прогресс выполнения агента в реальном времени.
Режимы стриминга
"values"— после каждого шага отдаёт полное состояние графа."updates"— отдаёт только дельту (что изменилось на этом шаге)."messages"— стримит токены LLM по мере генерации (требует LangChain-модели с поддержкой стриминга)."debug"— детализированные события для отладки.
Стриминг обновлений состояния (updates)
from typing import Annotated, TypedDict
import operator
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph.graph.message import add_messages
class State(TypedDict):
messages: Annotated[list, add_messages]
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
def call_model(state: State) -> dict:
response = llm.invoke(state["messages"])
return {"messages": [response]}
builder = StateGraph(State)
builder.add_node("model", call_model)
builder.set_entry_point("model")
builder.add_edge("model", END)
graph = builder.compile()
# Синхронный стриминг шагов
for chunk in graph.stream(
{"messages": [HumanMessage(content="Привет!")]},
stream_mode="updates",
):
print(chunk) # {"model": {"messages": [AIMessage(...)]}}
Стриминг токенов LLM (messages mode)
import asyncio
from langchain_core.messages import HumanMessage
async def stream_tokens():
async for chunk, metadata in graph.astream(
{"messages": [HumanMessage(content="Расскажи о Python")]},
stream_mode="messages",
):
# chunk — AIMessageChunk с токенами
# metadata — {"langgraph_node": "model", ...}
if hasattr(chunk, "content") and chunk.content:
print(chunk.content, end="", flush=True)
asyncio.run(stream_tokens())
astream_events для полного контроля
async def stream_events():
async for event in graph.astream_events(
{"messages": [HumanMessage(content="Вопрос")]},
version="v2",
):
kind = event["event"]
if kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
print(content, end="", flush=True)
elif kind == "on_chain_start":
print(f"\n[START node: {event['name']}]")
elif kind == "on_chain_end":
print(f"\n[END node: {event['name']}]")
asyncio.run(stream_events())
Несколько режимов одновременно
async for chunk in graph.astream(
{"messages": [HumanMessage(content="test")]},
stream_mode=["messages", "updates"], # список режимов
):
mode, data = chunk # tuple (режим, данные)
print(f"{mode}: {data}")
Подводные камни
- streaming=True в модели — без этого флага в ChatOpenAI/ChatAnthropic режим "messages" вернёт единственный чанк в конце, а не поток.
- astream vs stream — в async-контексте (FastAPI, async handlers) используйте async-версии; вызов синхронного stream внутри asyncio вызывает блокировку event loop.
- thread_id при стриминге с checkpointer — нужно передавать
config={"configurable": {"thread_id": "..."}}}, иначе состояние не сохраняется. - Большой объём событий в astream_events — при длинных агентных цепочках количество событий огромно; фильтруйте по
event["name"]. - Кодировка в HTTP SSE — при отдаче через FastAPI StreamingResponse убедитесь, что каждый чанк заканчивается на
\n\nдля корректного SSE-протокола. - Отмена стрима — если клиент отключился, asyncio-генератор нужно явно закрывать через
aclose(), иначе LLM-запрос продолжится в фоне. - Режим values медленнее — он сериализует всё состояние на каждом шаге; для больших состояний предпочтительнее updates.
Common mistakes
- Объяснять
streaming intermediate stepsтолько синтаксисом без shape, dtype, состояния или режима выполнения. - Игнорировать leakage, воспроизводимость, пустые входы и скрытые копии данных.
- Не проверять production-симптомы: latency, память, ретраи, дрейф качества и несовпадение версий.
What the interviewer is testing
- Может ли связать
streaming intermediate stepsс реальным контрактом входов и выходов. - Упоминает ли тесты, метрики, reproducibility и диагностику ошибок.
- Видит ли различие между demo-кодом в ноутбуке и production-пайплайном.