Как использовать middleware в LangChain v1 для auth, observability и enforcement политик?
Middleware в LangChain v0.3+ реализуется через RunnableLambda-обёртки, callbacks (BaseCallbackHandler), и LangSmith tracing — они покрывают auth, observability и policy enforcement без изменения бизнес-логики цепочки.
Middleware-паттерны в LangChain для auth, observability и policy enforcement
LangChain не имеет выделенного middleware API, но предоставляет три механизма, которые вместе закрывают все production-потребности: callbacks, LCEL-обёртки и LangSmith tracing.
Auth через RunnableLambda
Авторизацию удобно реализовать как первый шаг цепочки — RunnableLambda проверяет токен и прокидывает контекст дальше.
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
import jwt
SECRET = "my-secret"
def auth_middleware(inputs: dict) -> dict:
token = inputs.get("auth_token", "")
try:
payload = jwt.decode(token, SECRET, algorithms=["HS256"])
inputs["user_id"] = payload["sub"]
inputs["roles"] = payload.get("roles", [])
except jwt.ExpiredSignatureError:
raise PermissionError("Token expired")
except jwt.InvalidTokenError:
raise PermissionError("Invalid token")
return inputs
auth_step = RunnableLambda(auth_middleware)
Observability через BaseCallbackHandler
Callbacks — главный инструмент observability. BaseCallbackHandler перехватывает все события жизненного цикла цепочки.
from langchain_core.callbacks import BaseCallbackHandler
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class ObservabilityHandler(BaseCallbackHandler):
def on_chain_start(self, serialized, inputs, **kwargs):
self._start = datetime.utcnow()
logger.info("chain_start", extra={"inputs_keys": list(inputs.keys())})
def on_chain_end(self, outputs, **kwargs):
elapsed = (datetime.utcnow() - self._start).total_seconds()
logger.info("chain_end", extra={"elapsed_s": elapsed})
def on_llm_start(self, serialized, prompts, **kwargs):
logger.info("llm_start", extra={"model": serialized.get("name")})
def on_llm_end(self, response, **kwargs):
usage = response.llm_output.get("token_usage", {})
logger.info("llm_end", extra={"tokens": usage})
def on_chain_error(self, error, **kwargs):
logger.error("chain_error", extra={"error": str(error)})
handler = ObservabilityHandler()
LangSmith — production observability out of the box
LangSmith трейсит все вызовы автоматически через переменные окружения.
# .env
# LANGCHAIN_TRACING_V2=true
# LANGCHAIN_API_KEY=ls__...
# LANGCHAIN_PROJECT=my-production-app
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "ls__your_key"
# Теперь все цепочки автоматически трейсятся
result = chain.invoke({"question": "hello"})
# Trace доступен на smith.langchain.com
Policy enforcement через callback
Enforcement политик (rate limiting, content filtering, quota) реализуется в on_llm_start / on_chain_start.
import redis
from langchain_core.callbacks import BaseCallbackHandler
class PolicyEnforcementHandler(BaseCallbackHandler):
def __init__(self, redis_client: redis.Redis, rpm_limit: int = 60):
self.redis = redis_client
self.rpm_limit = rpm_limit
def on_llm_start(self, serialized, prompts, run_id=None, **kwargs):
user_id = kwargs.get("metadata", {}).get("user_id", "anonymous")
key = f"rate:{user_id}:{datetime.utcnow().strftime('%Y%m%d%H%M')}"
count = self.redis.incr(key)
self.redis.expire(key, 120)
if count > self.rpm_limit:
raise RuntimeError(f"Rate limit exceeded for user {user_id}")
def on_llm_end(self, response, **kwargs):
# Можно логировать стоимость запроса
usage = response.llm_output.get("token_usage", {})
cost = usage.get("total_tokens", 0) * 0.000002
# Сохранить в БД для биллинга
Сборка полного middleware-стека
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
llm = ChatOpenAI(model="gpt-4o")
prompt = ChatPromptTemplate.from_messages([("human", "{question}")])
chain = auth_step | prompt | llm
# Передача callbacks при вызове
result = chain.invoke(
{"auth_token": "valid.jwt.here", "question": "What is LangChain?"},
config={
"callbacks": [ObservabilityHandler(), PolicyEnforcementHandler(redis_client)],
"metadata": {"user_id": "user_123"},
"tags": ["production", "v2"],
}
)
Подводные камни
- Callbacks не перехватывают исключения из RunnableLambda — on_chain_error срабатывает только для ошибок внутри LLM-шагов; оберните lambda в try/except самостоятельно.
- Состояние в CallbackHandler не потокобезопасно — self._start = ... создаёт гонку при конкурентных вызовах; используйте run_id как ключ словаря.
- Metadata не прокидывается в sub-chains автоматически — при вызове вложенных цепочек передавайте config явно через RunnableConfig.
- LangSmith увеличивает latency на 10–50ms — при LANGCHAIN_TRACING_V2=true трейсы отправляются синхронно; в production используйте batch mode через LANGCHAIN_TRACING_BATCH=true.
- Auth в RunnableLambda не защищает streaming — при .astream() auth шаг выполняется один раз в начале; повторно проверять токен нужно в on_llm_start callback.
- Порядок callbacks имеет значение — PolicyEnforcementHandler должен идти до ObservabilityHandler чтобы не логировать заблокированные запросы.
Common mistakes
- Отвечать определением без production-сценария.
- Не называть runtime boundary, security boundary или failure mode.
- Игнорировать версию API, observability и тестовую проверку.
What the interviewer is testing
- Объясняет механизм своими словами и без выдуманных API.
- Называет реальные риски, диагностику и критерий корректности.
- Связывает ответ с текущей документацией и миграционными ограничениями.