LangGraphMiddleSystem design

Как проектировать permissions и audit trail для human-in-the-loop agent workflow?

HITL в LangGraph строится через interrupt(): граф приостанавливается, API проверяет RBAC-права reviewer, после чего вызывается Command(resume=...). Audit trail пишется в append-only таблицу БД — state графа не подходит, так как он перезаписывается.

Permissions и audit trail для human-in-the-loop агентных воркфлоу

Human-in-the-loop (HITL) в LangGraph реализуется через interrupt: граф приостанавливается перед чувствительным действием и ждёт явного подтверждения от человека. Задача архитектора — определить, кто может подтверждать какие действия, и зафиксировать все решения в неизменяемом журнале.

Interrupt: базовый механизм

from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from typing import TypedDict, Literal

class WorkflowState(TypedDict):
    action: str
    payload: dict
    approved_by: str
    audit_log: list

def sensitive_action_node(state: WorkflowState) -> dict:
    # Граф останавливается здесь и возвращает управление вызывающему коду
    decision = interrupt({
        "action": state["action"],
        "payload": state["payload"],
        "message": "Требуется подтверждение операции"
    })
    # После resume() decision содержит ответ человека
    return {
        "approved_by": decision["reviewer_id"],
        "audit_log": state["audit_log"] + [{
            "event": "human_decision",
            "action": state["action"],
            "decision": decision["approved"],
            "reviewer": decision["reviewer_id"],
            "timestamp": decision["timestamp"]
        }]
    }

def execute_action(state: WorkflowState) -> dict:
    # Выполняется только после подтверждения
    return {"audit_log": state["audit_log"] + [{"event": "executed", "action": state["action"]}]}

def route_after_decision(state: WorkflowState) -> str:
    last = state["audit_log"][-1]
    return "execute" if last.get("decision") else "reject"

Permissions: ролевой контроль доступа

Реализуйте проверку прав в слое, который вызывает graph.invoke(Command(resume=...)). Граф сам не знает о ролях — это ответственность вашего API:

from fastapi import FastAPI, HTTPException, Depends
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.types import Command
import datetime

app = FastAPI()

PERMISSIONS = {
    "deploy": ["senior_engineer", "team_lead", "ops"],
    "delete_data": ["dpo", "admin"],
    "send_bulk_email": ["marketing_lead", "admin"],
}

def check_permission(action: str, user_role: str) -> bool:
    allowed_roles = PERMISSIONS.get(action, [])
    return user_role in allowed_roles

@app.post("/workflows/{thread_id}/approve")
async def approve_action(
    thread_id: str,
    reviewer_id: str,
    reviewer_role: str,
    approved: bool,
    action: str,
    checkpointer: AsyncPostgresSaver = Depends(get_checkpointer)
):
    if not check_permission(action, reviewer_role):
        raise HTTPException(status_code=403, detail=f"Role '{reviewer_role}' cannot approve '{action}'")

    graph = build_graph(checkpointer)
    config = {"configurable": {"thread_id": thread_id}}

    result = await graph.ainvoke(
        Command(resume={
            "approved": approved,
            "reviewer_id": reviewer_id,
            "timestamp": datetime.datetime.utcnow().isoformat()
        }),
        config=config
    )
    return result

Audit trail: неизменяемый журнал

Audit trail нельзя хранить только в state графа — state можно перезаписать. Используйте отдельную append-only таблицу:

import asyncpg

CREATE_AUDIT_TABLE = """
CREATE TABLE IF NOT EXISTS workflow_audit (
    id BIGSERIAL PRIMARY KEY,
    thread_id TEXT NOT NULL,
    event_type TEXT NOT NULL,
    actor_id TEXT,
    actor_role TEXT,
    action TEXT,
    approved BOOLEAN,
    metadata JSONB,
    created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX ON workflow_audit (thread_id);
CREATE INDEX ON workflow_audit (created_at);
"""

async def write_audit_event(
    conn: asyncpg.Connection,
    thread_id: str,
    event_type: str,
    actor_id: str,
    actor_role: str,
    action: str,
    approved: bool,
    metadata: dict
):
    await conn.execute(
        """
        INSERT INTO workflow_audit
            (thread_id, event_type, actor_id, actor_role, action, approved, metadata)
        VALUES ($1, $2, $3, $4, $5, $6, $7)
        """,
        thread_id, event_type, actor_id, actor_role, action, approved,
        metadata
    )

Экспирация и тайм-аут на подтверждение

import asyncio

async def wait_for_approval_or_timeout(
    thread_id: str,
    timeout_seconds: int = 3600
) -> dict:
    try:
        return await asyncio.wait_for(
            poll_for_resume(thread_id),  # ваша функция опроса
            timeout=timeout_seconds
        )
    except asyncio.TimeoutError:
        # Автоматически отклоняем при истечении времени
        return {"approved": False, "reviewer_id": "system", "reason": "timeout"}

Подводные камни

  • Проверка прав только в UI — API endpoint resume должен валидировать роль; иначе любой с thread_id может одобрить действие.
  • Audit log только в state — state перезаписывается при resume; write-once запись обязательна в отдельной таблице.
  • Нет тайм-аута на interrupt — граф висит в состоянии interrupted вечно; добавляйте TTL и автоотклонение.
  • Один approver может одобрить свой же запрос — если initiator и reviewer — один человек, 4-eyes-principle нарушен; проверяйте initiator_id != reviewer_id.
  • Race condition при параллельных approve — два reviewer отправляют resume одновременно; первый Command(resume) выигрывает, второй игнорируется без ошибки — логируйте оба.
  • Отсутствие уведомлений reviewers — без Slack/email нотификации граф просто висит; интегрируйте webhook при входе в interrupt-состояние.
  • GDPR и PII в audit logmetadata может содержать персональные данные; применяйте field-level шифрование или маскирование.
  • Нет индекса по thread_id в audit — при тысячах событий запрос истории по thread станет full scan; добавляйте индексы при создании таблицы.

Common mistakes

  • Отвечать определением без production-сценария.
  • Не называть runtime boundary, security boundary или failure mode.
  • Игнорировать версию API, observability и тестовую проверку.

What the interviewer is testing

  • Объясняет механизм своими словами и без выдуманных API.
  • Называет реальные риски, диагностику и критерий корректности.
  • Связывает ответ с текущей документацией и миграционными ограничениями.

Sources

Related topics

Как проектировать permissions и audit trail для human-in-the-loop agent workflow? | Talanto