SQLAlchemyJuniorCoding

Как выполнять сырые SQL-запросы в SQLAlchemy, оставаясь в рамках ORM-сессии?

Используйте session.execute(text('SELECT ...'), {'param': value}) для сырых SQL в рамках текущей транзакции ORM-сессии. Параметры через :name защищают от инъекций; после DML нужен явный session.commit().

Сырые SQL-запросы в рамках ORM-сессии

Иногда нужно выполнить SQL, который ORM не умеет строить — оконные функции, UPSERT, рекурсивные CTE. SQLAlchemy позволяет делать это прямо через сессию, не разрывая транзакцию.

text() — самый простой способ

from sqlalchemy import text
from sqlalchemy.orm import Session

with Session(engine) as session:
    # Простой SELECT
    result = session.execute(
        text("SELECT id, name FROM users WHERE age > :min_age"),
        {"min_age": 18}
    )
    for row in result:
        print(row.id, row.name)

    # DML — UPDATE/INSERT/DELETE тоже через text()
    session.execute(
        text("UPDATE users SET last_login = NOW() WHERE id = :uid"),
        {"uid": 42}
    )
    session.commit()

Параметры через :name и словарь защищают от SQL-инъекций. Никогда не используйте f-строки для подстановки пользовательских данных.

Маппинг результатов на ORM-модели

from sqlalchemy import text

with Session(engine) as session:
    # Результат как list[User] через mappings()
    result = session.execute(
        text("SELECT * FROM users WHERE active = true")
    )
    # Преобразуем строки в dict
    rows = result.mappings().all()   # list[RowMapping]
    for row in rows:
        print(row["id"], row["name"])

connection.execute() внутри сессии

with Session(engine) as session:
    # Получить underlying connection (та же транзакция)
    conn = session.connection()
    result = conn.execute(
        text("SELECT pg_size_pretty(pg_database_size(current_database()))"
    ))
    print(result.scalar())

Async-версия

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text

async def bulk_update(session: AsyncSession, ids: list[int]) -> None:
    await session.execute(
        text("UPDATE jobs SET is_fresh = false WHERE id = ANY(:ids)"),
        {"ids": ids}
    )
    await session.commit()

Когда лучше использовать Core вместо text()

from sqlalchemy import insert, update
from sqlalchemy.dialects.postgresql import insert as pg_insert

# PostgreSQL UPSERT через Core (не text)
stmt = (
    pg_insert(User)
    .values(id=1, name="Alice", email="alice@example.com")
    .on_conflict_do_update(
        index_elements=["id"],
        set_={"name": "Alice Updated"}
    )
)
session.execute(stmt)

Core-выражения безопаснее, поддерживают разные диалекты и легче тестируются, чем строки.

Подводные камни

  • f-строки в text(): text(f"WHERE id = {user_id}") — уязвимость SQL-инъекции. Всегда :param + словарь.
  • Транзакция и autocommit: session.execute(text(...)) работает внутри текущей транзакции. Без session.commit() DML-изменения откатятся при закрытии сессии.
  • PostgreSQL-специфичный синтаксис: ANY(:ids) с массивом работает, но тип параметра должен быть Python-list — asyncpg сам сериализует его. С psycopg2 нужен python_pq_binary_format или cast.
  • ORM-кэш и text(): после DML через text() ORM не знает о изменениях в кэше identity map — вызовите session.expire_all() если потом читаете изменённые объекты через ORM.
  • Нет автоматической защиты от типов: text() не проверяет соответствие параметров колонкам — ошибки всплывут только в runtime.

Common mistakes

  • Описывать raw sql in session только как термин и не показывать механизм на минимальном примере.
  • Игнорировать ошибки, пустые данные, конкурентный доступ или границы транзакции.
  • Не связывать поведение с официальным контрактом SQLAlchemy и реальной эксплуатацией.

What the interviewer is testing

  • Объясняет raw sql in session через последовательность действий, а не через набор ключевых слов.
  • Приводит короткий кодовый пример или production-сценарий с ожидаемым поведением.
  • Называет хотя бы один риск: производительность, безопасность, транзакции, память или сопровождение.

Sources

Related topics