Как проектировать permissions и audit trail для human-in-the-loop agent workflow?
HITL в LangGraph строится через interrupt(): граф приостанавливается, API проверяет RBAC-права reviewer, после чего вызывается Command(resume=...). Audit trail пишется в append-only таблицу БД — state графа не подходит, так как он перезаписывается.
Permissions и audit trail для human-in-the-loop агентных воркфлоу
Human-in-the-loop (HITL) в LangGraph реализуется через interrupt: граф приостанавливается перед чувствительным действием и ждёт явного подтверждения от человека. Задача архитектора — определить, кто может подтверждать какие действия, и зафиксировать все решения в неизменяемом журнале.
Interrupt: базовый механизм
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from typing import TypedDict, Literal
class WorkflowState(TypedDict):
action: str
payload: dict
approved_by: str
audit_log: list
def sensitive_action_node(state: WorkflowState) -> dict:
# Граф останавливается здесь и возвращает управление вызывающему коду
decision = interrupt({
"action": state["action"],
"payload": state["payload"],
"message": "Требуется подтверждение операции"
})
# После resume() decision содержит ответ человека
return {
"approved_by": decision["reviewer_id"],
"audit_log": state["audit_log"] + [{
"event": "human_decision",
"action": state["action"],
"decision": decision["approved"],
"reviewer": decision["reviewer_id"],
"timestamp": decision["timestamp"]
}]
}
def execute_action(state: WorkflowState) -> dict:
# Выполняется только после подтверждения
return {"audit_log": state["audit_log"] + [{"event": "executed", "action": state["action"]}]}
def route_after_decision(state: WorkflowState) -> str:
last = state["audit_log"][-1]
return "execute" if last.get("decision") else "reject"
Permissions: ролевой контроль доступа
Реализуйте проверку прав в слое, который вызывает graph.invoke(Command(resume=...)). Граф сам не знает о ролях — это ответственность вашего API:
from fastapi import FastAPI, HTTPException, Depends
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.types import Command
import datetime
app = FastAPI()
PERMISSIONS = {
"deploy": ["senior_engineer", "team_lead", "ops"],
"delete_data": ["dpo", "admin"],
"send_bulk_email": ["marketing_lead", "admin"],
}
def check_permission(action: str, user_role: str) -> bool:
allowed_roles = PERMISSIONS.get(action, [])
return user_role in allowed_roles
@app.post("/workflows/{thread_id}/approve")
async def approve_action(
thread_id: str,
reviewer_id: str,
reviewer_role: str,
approved: bool,
action: str,
checkpointer: AsyncPostgresSaver = Depends(get_checkpointer)
):
if not check_permission(action, reviewer_role):
raise HTTPException(status_code=403, detail=f"Role '{reviewer_role}' cannot approve '{action}'")
graph = build_graph(checkpointer)
config = {"configurable": {"thread_id": thread_id}}
result = await graph.ainvoke(
Command(resume={
"approved": approved,
"reviewer_id": reviewer_id,
"timestamp": datetime.datetime.utcnow().isoformat()
}),
config=config
)
return result
Audit trail: неизменяемый журнал
Audit trail нельзя хранить только в state графа — state можно перезаписать. Используйте отдельную append-only таблицу:
import asyncpg
CREATE_AUDIT_TABLE = """
CREATE TABLE IF NOT EXISTS workflow_audit (
id BIGSERIAL PRIMARY KEY,
thread_id TEXT NOT NULL,
event_type TEXT NOT NULL,
actor_id TEXT,
actor_role TEXT,
action TEXT,
approved BOOLEAN,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX ON workflow_audit (thread_id);
CREATE INDEX ON workflow_audit (created_at);
"""
async def write_audit_event(
conn: asyncpg.Connection,
thread_id: str,
event_type: str,
actor_id: str,
actor_role: str,
action: str,
approved: bool,
metadata: dict
):
await conn.execute(
"""
INSERT INTO workflow_audit
(thread_id, event_type, actor_id, actor_role, action, approved, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7)
""",
thread_id, event_type, actor_id, actor_role, action, approved,
metadata
)
Экспирация и тайм-аут на подтверждение
import asyncio
async def wait_for_approval_or_timeout(
thread_id: str,
timeout_seconds: int = 3600
) -> dict:
try:
return await asyncio.wait_for(
poll_for_resume(thread_id), # ваша функция опроса
timeout=timeout_seconds
)
except asyncio.TimeoutError:
# Автоматически отклоняем при истечении времени
return {"approved": False, "reviewer_id": "system", "reason": "timeout"}
Подводные камни
- Проверка прав только в UI — API endpoint resume должен валидировать роль; иначе любой с thread_id может одобрить действие.
- Audit log только в state — state перезаписывается при resume; write-once запись обязательна в отдельной таблице.
- Нет тайм-аута на interrupt — граф висит в состоянии interrupted вечно; добавляйте TTL и автоотклонение.
- Один approver может одобрить свой же запрос — если initiator и reviewer — один человек, 4-eyes-principle нарушен; проверяйте
initiator_id != reviewer_id. - Race condition при параллельных approve — два reviewer отправляют resume одновременно; первый Command(resume) выигрывает, второй игнорируется без ошибки — логируйте оба.
- Отсутствие уведомлений reviewers — без Slack/email нотификации граф просто висит; интегрируйте webhook при входе в interrupt-состояние.
- GDPR и PII в audit log —
metadataможет содержать персональные данные; применяйте field-level шифрование или маскирование. - Нет индекса по thread_id в audit — при тысячах событий запрос истории по thread станет full scan; добавляйте индексы при создании таблицы.
Common mistakes
- Отвечать определением без production-сценария.
- Не называть runtime boundary, security boundary или failure mode.
- Игнорировать версию API, observability и тестовую проверку.
What the interviewer is testing
- Объясняет механизм своими словами и без выдуманных API.
- Называет реальные риски, диагностику и критерий корректности.
- Связывает ответ с текущей документацией и миграционными ограничениями.