KtorSeniorTechnical

Что такое pipeline-система в Ktor и как происходит обработка запроса?

Pipeline в Ktor — упорядоченный набор фаз (Setup, Monitoring, Plugins, Call, Fallback), через которые проходит ApplicationCall. Каждая фаза содержит интерсепторы; proceed() передаёт управление дальше, finish() прерывает цепочку.

Pipeline-система Ktor: концепция

Pipeline — центральный механизм обработки в Ktor. Это упорядоченный список фаз (PipelinePhase), через которые последовательно проходит объект субъекта (например, ApplicationCall). Каждая фаза содержит интерсепторы — suspending-функции, которые могут выполнить логику, модифицировать субъект или досрочно завершить обработку через finish().

ApplicationCallPipeline: фазы

Серверный pipeline для обработки HTTP-запроса имеет пять фаз в порядке выполнения:

  • Setup — инициализация, подготовка контекста
  • Monitoring — логирование, трассировка, метрики (CallLogging работает здесь)
  • Plugins (ранее Features) — бизнес-плагины: Authentication, Sessions, CORS
  • Call — маршрутизация и обработчики маршрутов
  • Fallback — обработка незавершённых вызовов (404)

Путь запроса через pipeline

// Схема: входящий запрос -> ApplicationCallPipeline
// Setup -> Monitoring -> Plugins -> Call -> Fallback
// Внутри Call -> RoutingPipeline -> конкретный маршрут
// Ответ -> ApplicationSendPipeline

Добавление интерсептора к фазе

import io.ktor.server.application.*
import io.ktor.server.request.*

fun Application.configurePipeline() {
    // Перехват на фазе Monitoring для всех вызовов
    intercept(ApplicationCallPipeline.Monitoring) {
        val start = System.currentTimeMillis()
        try {
            proceed()  // передать управление следующим интерсепторам
        } finally {
            val duration = System.currentTimeMillis() - start
            application.log.info("${call.request.uri} completed in ${duration}ms")
        }
    }

    // Перехват на фазе Plugins (до маршрутизации)
    intercept(ApplicationCallPipeline.Plugins) {
        val token = call.request.headers["X-API-Token"]
        if (token == null) {
            call.respond(io.ktor.http.HttpStatusCode.Unauthorized, "Missing token")
            return@intercept finish()  // остановить pipeline
        }
        proceed()
    }
}

Создание собственного плагина через Pipeline

import io.ktor.server.application.*
import io.ktor.server.request.*
import io.ktor.util.*

val RequestIdPlugin = createApplicationPlugin("RequestId") {
    onCall { call ->
        // onCall = интерсептор фазы Call
        val requestId = call.request.headers["X-Request-Id"]
            ?: java.util.UUID.randomUUID().toString()
        // Сохранить в атрибутах call для downstream-доступа
        call.attributes.put(RequestIdKey, requestId)
    }
}

val RequestIdKey = AttributeKey<String>("RequestId")

// Использование
fun Application.configurePlugins() {
    install(RequestIdPlugin)
}

RoutingPipeline: фазы маршрутизации

import io.ktor.server.routing.*

fun Application.configureRouting() {
    routing {
        // Интерсепция на уровне routing-pipeline
        intercept(RoutingRoot.PLUGIN_REGISTRY.first()) {
            proceed()
        }

        route("/api") {
            // Интерсептор для подгруппы маршрутов
            intercept(RoutingPhase) {
                // Проверка авторизации для всей группы /api
                proceed()
            }

            get("/users") {
                call.respondText("users list")
            }
        }
    }
}

ApplicationSendPipeline

После обработчика маршрута ответ проходит через ApplicationSendPipeline с фазами: Before, Transform, Render, ContentEncoding, TransferEncoding, After. Плагин ContentNegotiation работает в фазе Render, сериализуя объекты в JSON/XML.

Порядок регистрации имеет значение

// Неверно: Authentication должна стоять до бизнес-плагинов
install(MyPlugin)
install(Authentication)  // слишком поздно

// Верно:
install(Authentication)
install(MyPlugin)

Подводные камни

  • Вызов proceed() без return@intercept после finish() продолжит выполнение кода интерсептора — всегда используйте return@intercept finish() для полной остановки.
  • Исключение внутри intercept { } без try/catch обходит StatusPages-плагин, если не настроена обработка в нужной фазе — добавляйте перехват в Monitoring с try/finally.
  • Порядок install() важен: плагины встраиваются в pipeline в порядке установки; CORS перед Authentication — иначе preflight получит 401.
  • Интерсепторы в routing-pipeline выполняются заново для каждого вложенного маршрута при RoutingResolve — не добавляйте тяжёлую логику в intercept routing без фильтрации.
  • call.attributes.put() — потокобезопасный способ передачи данных между фазами; прямое использование внешних переменных внутри интерсептора может привести к race condition при параллельных запросах.
  • В Ktor 2.x плагины создаются через createApplicationPlugin/createRouteScopedPlugin; старый API ApplicationFeature удалён — код из туториалов под Ktor 1.x не скомпилируется.
  • Фаза Fallback вызывается только если ни один маршрут не завершил вызов; если маршрут существует, но бросает исключение, Fallback не срабатывает — нужен StatusPages.
  • Нельзя добавить интерсептор к фазе, которой нет в данном pipeline — например, серверная фаза Call недоступна в клиентском pipeline, попытка вызовет InvalidPhaseException.

Common mistakes

  • Путать термин «ktor pipeline» с соседним механизмом Ktor.
  • Не называть границу lifecycle, transaction, thread или request для «ktor pipeline».
  • Игнорировать production-эффекты «ktor pipeline»: latency, SQL shape, memory, security или observability.

What the interviewer is testing

  • Попросить объяснить механизм «ktor pipeline» на минимальном примере.
  • Проверить, видит ли кандидат failure mode и диагностику для «ktor pipeline».
  • Уточнить, какие настройки или API меняют «ktor pipeline» в реальном сервисе.

Sources

Related topics