FastAPISeniorSystem design

Как управлять транзакциями базы данных и откатами в FastAPI с SQLAlchemy?

Транзакции в FastAPI+SQLAlchemy управляются через dependency с yield: при успехе — commit, при исключении — rollback. Для частичных откатов используйте begin_nested() (SAVEPOINT).

Базовая модель: сессия через Dependency Injection

SQLAlchemy AsyncSession оборачивается в FastAPI dependency с yield. Транзакция начинается автоматически при первом запросе к БД и фиксируется (или откатывается) в блоке finally.

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from fastapi import Depends

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=10,
    max_overflow=20,
)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

async def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

Использование в endpoint

from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import select

app = FastAPI()

@app.post("/transfer")
async def transfer(
    from_id: int,
    to_id: int,
    amount: float,
    db: AsyncSession = Depends(get_db),
):
    from_acc = await db.get(Account, from_id)
    to_acc = await db.get(Account, to_id)

    if from_acc.balance < amount:
        raise HTTPException(400, "Insufficient funds")  # автоматический rollback

    from_acc.balance -= amount
    to_acc.balance += amount
    # commit выполнится в dependency при успешном выходе из yield
    return {"status": "ok"}

Savepoints и вложенные транзакции

SQLAlchemy поддерживает begin_nested() для создания SAVEPOINT. Позволяет откатить часть операций без потери всей транзакции:

async def create_order_with_fallback(db: AsyncSession, order_data: dict):
    async with db.begin_nested() as nested:
        try:
            order = Order(**order_data)
            db.add(order)
            await db.flush()  # проверяем constraints без commit
            await send_notification(order.id)  # может упасть
        except Exception:
            await nested.rollback()  # откат только вложенной части
            order = Order(**order_data, notify=False)
            db.add(order)
            await db.flush()
    # внешняя транзакция продолжает работу

Явный контроль транзакций

@app.post("/batch")
async def batch_insert(items: list[ItemCreate], db: AsyncSession = Depends(get_db)):
    async with db.begin():  # явная транзакция
        for item in items:
            db.add(Item(**item.model_dump()))
    # commit вызывается автоматически при выходе из context manager
    return {"inserted": len(items)}

Isolation level

Для критичных операций (например, вычитание остатка) устанавливайте уровень изоляции на уровне соединения или движка:

engine = create_async_engine(
    DATABASE_URL,
    execution_options={"isolation_level": "REPEATABLE READ"},
)

# или для конкретного запроса:
async with engine.begin() as conn:
    await conn.execution_options(isolation_level="SERIALIZABLE")
    result = await conn.execute(select(Account).where(...))

Подводные камни

  • expire_on_commit=True (дефолт) — после commit все атрибуты объектов инвалидируются; при обращении к ним вне сессии возникает DetachedInstanceError. Используйте expire_on_commit=False в async-контексте.
  • Долгие транзакции — держать транзакцию открытой во время HTTP-вызова к внешнему сервису блокирует строки и истощает пул соединений.
  • await session.flush() vs commit()flush() отправляет SQL в БД но не фиксирует; данные видны только в текущей транзакции. Забытый flush перед чтением auto-increment ID — частая ошибка.
  • rollback при HTTPException — HTTPException — это обычный raise; dependency с except Exception: rollback корректно откатывает транзакцию, но только если exception не перехвачен раньше.
  • Несколько сессий в одном запросе — если два Depends используют разные сессии, они видят разные состояния БД; объединяйте их в одну сессию через общий Depends.
  • Savepoint не работает без begin()begin_nested() требует активной внешней транзакции; в autocommit-режиме вызов упадёт.
  • SELECT FOR UPDATE в async — используйте with_for_update() в SQLAlchemy, а не сырой SQL, чтобы избежать дедлоков при конкурентных обновлениях.

Common mistakes

  • Описывать database transactions только как термин и не показывать механизм на минимальном примере.
  • Игнорировать ошибки, пустые данные, конкурентный доступ или границы транзакции.
  • Не связывать поведение с официальным контрактом FastAPI и реальной эксплуатацией.

What the interviewer is testing

  • Объясняет database transactions через последовательность действий, а не через набор ключевых слов.
  • Приводит короткий кодовый пример или production-сценарий с ожидаемым поведением.
  • Называет хотя бы один риск: производительность, безопасность, транзакции, память или сопровождение.
  • Умеет обсудить отказ, наблюдаемость и rollback без изменения публичного контракта.

Sources

Related topics