FastAPISeniorSystem design
Как управлять транзакциями базы данных и откатами в FastAPI с SQLAlchemy?
Транзакции в FastAPI+SQLAlchemy управляются через dependency с yield: при успехе — commit, при исключении — rollback. Для частичных откатов используйте begin_nested() (SAVEPOINT).
Базовая модель: сессия через Dependency Injection
SQLAlchemy AsyncSession оборачивается в FastAPI dependency с yield. Транзакция начинается автоматически при первом запросе к БД и фиксируется (или откатывается) в блоке finally.
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from fastapi import Depends
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=10,
max_overflow=20,
)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
async def get_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
Использование в endpoint
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import select
app = FastAPI()
@app.post("/transfer")
async def transfer(
from_id: int,
to_id: int,
amount: float,
db: AsyncSession = Depends(get_db),
):
from_acc = await db.get(Account, from_id)
to_acc = await db.get(Account, to_id)
if from_acc.balance < amount:
raise HTTPException(400, "Insufficient funds") # автоматический rollback
from_acc.balance -= amount
to_acc.balance += amount
# commit выполнится в dependency при успешном выходе из yield
return {"status": "ok"}
Savepoints и вложенные транзакции
SQLAlchemy поддерживает begin_nested() для создания SAVEPOINT. Позволяет откатить часть операций без потери всей транзакции:
async def create_order_with_fallback(db: AsyncSession, order_data: dict):
async with db.begin_nested() as nested:
try:
order = Order(**order_data)
db.add(order)
await db.flush() # проверяем constraints без commit
await send_notification(order.id) # может упасть
except Exception:
await nested.rollback() # откат только вложенной части
order = Order(**order_data, notify=False)
db.add(order)
await db.flush()
# внешняя транзакция продолжает работу
Явный контроль транзакций
@app.post("/batch")
async def batch_insert(items: list[ItemCreate], db: AsyncSession = Depends(get_db)):
async with db.begin(): # явная транзакция
for item in items:
db.add(Item(**item.model_dump()))
# commit вызывается автоматически при выходе из context manager
return {"inserted": len(items)}
Isolation level
Для критичных операций (например, вычитание остатка) устанавливайте уровень изоляции на уровне соединения или движка:
engine = create_async_engine(
DATABASE_URL,
execution_options={"isolation_level": "REPEATABLE READ"},
)
# или для конкретного запроса:
async with engine.begin() as conn:
await conn.execution_options(isolation_level="SERIALIZABLE")
result = await conn.execute(select(Account).where(...))
Подводные камни
- expire_on_commit=True (дефолт) — после commit все атрибуты объектов инвалидируются; при обращении к ним вне сессии возникает
DetachedInstanceError. Используйтеexpire_on_commit=Falseв async-контексте. - Долгие транзакции — держать транзакцию открытой во время HTTP-вызова к внешнему сервису блокирует строки и истощает пул соединений.
- await session.flush() vs commit() —
flush()отправляет SQL в БД но не фиксирует; данные видны только в текущей транзакции. Забытый flush перед чтением auto-increment ID — частая ошибка. - rollback при HTTPException — HTTPException — это обычный raise; dependency с
except Exception: rollbackкорректно откатывает транзакцию, но только если exception не перехвачен раньше. - Несколько сессий в одном запросе — если два Depends используют разные сессии, они видят разные состояния БД; объединяйте их в одну сессию через общий Depends.
- Savepoint не работает без begin() —
begin_nested()требует активной внешней транзакции; в autocommit-режиме вызов упадёт. - SELECT FOR UPDATE в async — используйте
with_for_update()в SQLAlchemy, а не сырой SQL, чтобы избежать дедлоков при конкурентных обновлениях.
Common mistakes
- Описывать database transactions только как термин и не показывать механизм на минимальном примере.
- Игнорировать ошибки, пустые данные, конкурентный доступ или границы транзакции.
- Не связывать поведение с официальным контрактом FastAPI и реальной эксплуатацией.
What the interviewer is testing
- Объясняет database transactions через последовательность действий, а не через набор ключевых слов.
- Приводит короткий кодовый пример или production-сценарий с ожидаемым поведением.
- Называет хотя бы один риск: производительность, безопасность, транзакции, память или сопровождение.
- Умеет обсудить отказ, наблюдаемость и rollback без изменения публичного контракта.