LangGraphMiddleCoding

Как реализовать стриминг в LangGraph для отображения промежуточных шагов агента пользователю?

LangGraph поддерживает стриминг через graph.stream() / graph.astream() с режимами values, updates, messages и events; для промежуточных шагов агента используют режим messages или astream_events.

Стриминг в LangGraph

LangGraph предоставляет несколько режимов стриминга, которые выбираются параметром stream_mode. Это позволяет показывать пользователю прогресс выполнения агента в реальном времени.

Режимы стриминга

  • "values" — после каждого шага отдаёт полное состояние графа.
  • "updates" — отдаёт только дельту (что изменилось на этом шаге).
  • "messages" — стримит токены LLM по мере генерации (требует LangChain-модели с поддержкой стриминга).
  • "debug" — детализированные события для отладки.

Стриминг обновлений состояния (updates)

from typing import Annotated, TypedDict
import operator
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph.graph.message import add_messages


class State(TypedDict):
    messages: Annotated[list, add_messages]


llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)


def call_model(state: State) -> dict:
    response = llm.invoke(state["messages"])
    return {"messages": [response]}


builder = StateGraph(State)
builder.add_node("model", call_model)
builder.set_entry_point("model")
builder.add_edge("model", END)
graph = builder.compile()

# Синхронный стриминг шагов
for chunk in graph.stream(
    {"messages": [HumanMessage(content="Привет!")]},
    stream_mode="updates",
):
    print(chunk)  # {"model": {"messages": [AIMessage(...)]}}

Стриминг токенов LLM (messages mode)

import asyncio
from langchain_core.messages import HumanMessage


async def stream_tokens():
    async for chunk, metadata in graph.astream(
        {"messages": [HumanMessage(content="Расскажи о Python")]},
        stream_mode="messages",
    ):
        # chunk — AIMessageChunk с токенами
        # metadata — {"langgraph_node": "model", ...}
        if hasattr(chunk, "content") and chunk.content:
            print(chunk.content, end="", flush=True)


asyncio.run(stream_tokens())

astream_events для полного контроля

async def stream_events():
    async for event in graph.astream_events(
        {"messages": [HumanMessage(content="Вопрос")]},
        version="v2",
    ):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                print(content, end="", flush=True)
        elif kind == "on_chain_start":
            print(f"\n[START node: {event['name']}]")
        elif kind == "on_chain_end":
            print(f"\n[END node: {event['name']}]")


asyncio.run(stream_events())

Несколько режимов одновременно

async for chunk in graph.astream(
    {"messages": [HumanMessage(content="test")]},
    stream_mode=["messages", "updates"],  # список режимов
):
    mode, data = chunk  # tuple (режим, данные)
    print(f"{mode}: {data}")

Подводные камни

  • streaming=True в модели — без этого флага в ChatOpenAI/ChatAnthropic режим "messages" вернёт единственный чанк в конце, а не поток.
  • astream vs stream — в async-контексте (FastAPI, async handlers) используйте async-версии; вызов синхронного stream внутри asyncio вызывает блокировку event loop.
  • thread_id при стриминге с checkpointer — нужно передавать config={"configurable": {"thread_id": "..."}}}, иначе состояние не сохраняется.
  • Большой объём событий в astream_events — при длинных агентных цепочках количество событий огромно; фильтруйте по event["name"].
  • Кодировка в HTTP SSE — при отдаче через FastAPI StreamingResponse убедитесь, что каждый чанк заканчивается на \n\n для корректного SSE-протокола.
  • Отмена стрима — если клиент отключился, asyncio-генератор нужно явно закрывать через aclose(), иначе LLM-запрос продолжится в фоне.
  • Режим values медленнее — он сериализует всё состояние на каждом шаге; для больших состояний предпочтительнее updates.

Common mistakes

  • Объяснять streaming intermediate steps только синтаксисом без shape, dtype, состояния или режима выполнения.
  • Игнорировать leakage, воспроизводимость, пустые входы и скрытые копии данных.
  • Не проверять production-симптомы: latency, память, ретраи, дрейф качества и несовпадение версий.

What the interviewer is testing

  • Может ли связать streaming intermediate steps с реальным контрактом входов и выходов.
  • Упоминает ли тесты, метрики, reproducibility и диагностику ошибок.
  • Видит ли различие между demo-кодом в ноутбуке и production-пайплайном.

Sources

Related topics