LangChainSeniorTechnical

Как использовать middleware в LangChain v1 для auth, observability и enforcement политик?

Middleware в LangChain v0.3+ реализуется через RunnableLambda-обёртки, callbacks (BaseCallbackHandler), и LangSmith tracing — они покрывают auth, observability и policy enforcement без изменения бизнес-логики цепочки.

Middleware-паттерны в LangChain для auth, observability и policy enforcement

LangChain не имеет выделенного middleware API, но предоставляет три механизма, которые вместе закрывают все production-потребности: callbacks, LCEL-обёртки и LangSmith tracing.

Auth через RunnableLambda

Авторизацию удобно реализовать как первый шаг цепочки — RunnableLambda проверяет токен и прокидывает контекст дальше.

from langchain_core.runnables import RunnableLambda, RunnablePassthrough
import jwt

SECRET = "my-secret"

def auth_middleware(inputs: dict) -> dict:
    token = inputs.get("auth_token", "")
    try:
        payload = jwt.decode(token, SECRET, algorithms=["HS256"])
        inputs["user_id"] = payload["sub"]
        inputs["roles"] = payload.get("roles", [])
    except jwt.ExpiredSignatureError:
        raise PermissionError("Token expired")
    except jwt.InvalidTokenError:
        raise PermissionError("Invalid token")
    return inputs

auth_step = RunnableLambda(auth_middleware)

Observability через BaseCallbackHandler

Callbacks — главный инструмент observability. BaseCallbackHandler перехватывает все события жизненного цикла цепочки.

from langchain_core.callbacks import BaseCallbackHandler
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

class ObservabilityHandler(BaseCallbackHandler):
    def on_chain_start(self, serialized, inputs, **kwargs):
        self._start = datetime.utcnow()
        logger.info("chain_start", extra={"inputs_keys": list(inputs.keys())})

    def on_chain_end(self, outputs, **kwargs):
        elapsed = (datetime.utcnow() - self._start).total_seconds()
        logger.info("chain_end", extra={"elapsed_s": elapsed})

    def on_llm_start(self, serialized, prompts, **kwargs):
        logger.info("llm_start", extra={"model": serialized.get("name")})

    def on_llm_end(self, response, **kwargs):
        usage = response.llm_output.get("token_usage", {})
        logger.info("llm_end", extra={"tokens": usage})

    def on_chain_error(self, error, **kwargs):
        logger.error("chain_error", extra={"error": str(error)})

handler = ObservabilityHandler()

LangSmith — production observability out of the box

LangSmith трейсит все вызовы автоматически через переменные окружения.

# .env
# LANGCHAIN_TRACING_V2=true
# LANGCHAIN_API_KEY=ls__...
# LANGCHAIN_PROJECT=my-production-app

import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "ls__your_key"

# Теперь все цепочки автоматически трейсятся
result = chain.invoke({"question": "hello"})
# Trace доступен на smith.langchain.com

Policy enforcement через callback

Enforcement политик (rate limiting, content filtering, quota) реализуется в on_llm_start / on_chain_start.

import redis
from langchain_core.callbacks import BaseCallbackHandler

class PolicyEnforcementHandler(BaseCallbackHandler):
    def __init__(self, redis_client: redis.Redis, rpm_limit: int = 60):
        self.redis = redis_client
        self.rpm_limit = rpm_limit

    def on_llm_start(self, serialized, prompts, run_id=None, **kwargs):
        user_id = kwargs.get("metadata", {}).get("user_id", "anonymous")
        key = f"rate:{user_id}:{datetime.utcnow().strftime('%Y%m%d%H%M')}"
        count = self.redis.incr(key)
        self.redis.expire(key, 120)
        if count > self.rpm_limit:
            raise RuntimeError(f"Rate limit exceeded for user {user_id}")

    def on_llm_end(self, response, **kwargs):
        # Можно логировать стоимость запроса
        usage = response.llm_output.get("token_usage", {})
        cost = usage.get("total_tokens", 0) * 0.000002
        # Сохранить в БД для биллинга

Сборка полного middleware-стека

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

llm = ChatOpenAI(model="gpt-4o")
prompt = ChatPromptTemplate.from_messages([("human", "{question}")])

chain = auth_step | prompt | llm

# Передача callbacks при вызове
result = chain.invoke(
    {"auth_token": "valid.jwt.here", "question": "What is LangChain?"},
    config={
        "callbacks": [ObservabilityHandler(), PolicyEnforcementHandler(redis_client)],
        "metadata": {"user_id": "user_123"},
        "tags": ["production", "v2"],
    }
)

Подводные камни

  • Callbacks не перехватывают исключения из RunnableLambda — on_chain_error срабатывает только для ошибок внутри LLM-шагов; оберните lambda в try/except самостоятельно.
  • Состояние в CallbackHandler не потокобезопасно — self._start = ... создаёт гонку при конкурентных вызовах; используйте run_id как ключ словаря.
  • Metadata не прокидывается в sub-chains автоматически — при вызове вложенных цепочек передавайте config явно через RunnableConfig.
  • LangSmith увеличивает latency на 10–50ms — при LANGCHAIN_TRACING_V2=true трейсы отправляются синхронно; в production используйте batch mode через LANGCHAIN_TRACING_BATCH=true.
  • Auth в RunnableLambda не защищает streaming — при .astream() auth шаг выполняется один раз в начале; повторно проверять токен нужно в on_llm_start callback.
  • Порядок callbacks имеет значение — PolicyEnforcementHandler должен идти до ObservabilityHandler чтобы не логировать заблокированные запросы.

Common mistakes

  • Отвечать определением без production-сценария.
  • Не называть runtime boundary, security boundary или failure mode.
  • Игнорировать версию API, observability и тестовую проверку.

What the interviewer is testing

  • Объясняет механизм своими словами и без выдуманных API.
  • Называет реальные риски, диагностику и критерий корректности.
  • Связывает ответ с текущей документацией и миграционными ограничениями.

Sources

Related topics