SQLAlchemyMiddleExperience

Расскажите о случае, когда SQLAlchemy помог или помешал вам при миграциях, транзакциях, оптимизации запросов или борьбе с N+1.

Работал с SQLAlchemy 2.0 async: устранял N+1 через selectinload/joinedload, разрешал транзакционные гонки через with_for_update(), стабилизировал Alembic-миграции backward-compatible шагами для zero-downtime деплоя.

Контекст

На проекте с FastAPI + SQLAlchemy 2.0 (async) мне пришлось провести крупный рефакторинг: устранить N+1, исправить транзакционные гонки и стабилизировать Alembic-миграции при работе в команде.

Миграции с Alembic

Основная боль — конфликты в alembic/versions/ при параллельных ветках. Ввёл правило: каждая миграция в отдельном PR, ревью обязательно. Настроил alembic.ini для именования файлов с датой:

file_template = %%(year)d%%(month).2d%%(day).2d_%%(rev)s_%%(slug)s

Для zero-downtime деплоя разбивал изменения на backward-compatible шаги: сначала ADD COLUMN ... DEFAULT NULL, потом backfill-скрипт, потом NOT NULL constraint в отдельной миграции.

Транзакции

В async SQLAlchemy 2.0 транзакции явные:

async with async_session() as session:
    async with session.begin():
        order = Order(user_id=user_id, total=total)
        session.add(order)
        await session.flush()  # получить order.id без commit
        session.add(AuditLog(order_id=order.id, event="created"))
    # commit при выходе из begin()-контекста

Столкнулся с гонкой: два параллельных запроса читали одну строку и оба её обновляли. Исправил через with_for_update():

stmt = select(Wallet).where(Wallet.user_id == uid).with_for_update()
result = await session.execute(stmt)
wallet = result.scalar_one()

N+1 запросы

Обнаружил через sqlalchemy.engine logger (уровень INFO) и sqlalchemy-utils QueryCounterMiddleware. Типичный паттерн — итерация по заказам с обращением к order.items:

# До: N+1
orders = (await session.execute(select(Order))).scalars().all()
for o in orders:
    print(o.items)  # lazy load на каждую строку

# После: joinedload
stmt = select(Order).options(joinedload(Order.items))
orders = (await session.execute(stmt)).unique().scalars().all()

Для больших коллекций предпочёл selectinload — он делает один отдельный SELECT IN вместо JOIN, избегая дублирования строк.

Оптимизация запросов

Использовал select(Order.id, Order.status) вместо select(Order) в read-only эндпоинтах — меньше трафика по сети, быстрее десериализация. Добавил индексы через Index в Alembic-миграцию:

op.create_index("ix_orders_user_id_status",
                "orders", ["user_id", "status"])

Подводные камни

  • В async SQLAlchemy lazy load вызывает MissingGreenlet / greenlet_spawn исключение — все связи нужно загружать явно через selectinload / joinedload.
  • Alembic autogenerate не видит изменения в default-значениях и check-constraints без явного сравнения — всегда проверяйте сгенерированный скрипт.
  • session.flush() без commit делает изменения видимыми внутри транзакции, но не для других сессий.
  • expire_on_commit=True (дефолт) делает объекты expired после commit — обращение к атрибутам вне сессии вызывает DetachedInstanceError.
  • with_for_update() без таймаута может привести к бесконечному ожиданию; PostgreSQL поддерживает .with_for_update(nowait=True).
  • Миграции с op.execute("ALTER TABLE ... LOCK") блокируют таблицу — для больших таблиц используйте pg_repack или concurrent index creation.

What hurts your answer

  • Выдумывать опыт или говорить слишком общими фразами
  • Не объяснять свою личную роль в работе с SQLAlchemy
  • Не показывать результат, метрики или извлечённые уроки

What they're listening for

  • Может подготовить честный пример использования SQLAlchemy
  • Показывает свою роль, решения и результат
  • Умеет рефлексировать над trade-offs и уроками

Related topics