Kotlin CoroutinesMiddleTechnical

Что делает Dispatchers.IO.limitedParallelism() и когда это полезно?

limitedParallelism(n) создаёт представление диспетчера с ограниченным числом потоков из исходного пула. Используется для изоляции тяжёлых задач, чтобы они не занимали весь Dispatchers.IO или Default.

Dispatchers.IO.limitedParallelism() — детальный разбор

limitedParallelism(parallelism: Int) — метод класса CoroutineDispatcher, добавленный в kotlinx-coroutines 1.6.0. Создаёт новый диспетчер-представление, который использует потоки родительского пула, но не более чем parallelism одновременно.

Как это работает под капотом

В отличие от newFixedThreadPoolContext, limitedParallelism не создаёт новые потоки. Он добавляет ограничитель поверх существующего пула через семафор. Потоки берутся из родительского диспетчера, но задачи ставятся в очередь если лимит достигнут.

// Проверим поведение
val limitedDispatcher = Dispatchers.IO.limitedParallelism(2)

suspend fun demonstrateLimited() = coroutineScope {
    repeat(5) { i ->
        launch(limitedDispatcher) {
            println("Task $i starting on ${Thread.currentThread().name}")
            delay(1000) // simulate IO
            println("Task $i done")
        }
    }
}
// Вывод: только 2 задачи запускаются одновременно
// Task 0 starting on DefaultDispatcher-worker-1
// Task 1 starting on DefaultDispatcher-worker-2
// (после 1000мс)
// Task 2 starting on DefaultDispatcher-worker-1
// Task 3 starting on DefaultDispatcher-worker-2
// Task 4 starting on DefaultDispatcher-worker-1

Практические сценарии применения

Сценарий 1: изоляция тяжёлых загрузок

class ImageLoader {
    // Максимум 3 параллельных загрузки — не засоряем весь IO пул
    private val downloadDispatcher = Dispatchers.IO.limitedParallelism(3)

    suspend fun downloadImage(url: String): Bitmap =
        withContext(downloadDispatcher) {
            URL(url).openStream().use { BitmapFactory.decodeStream(it) }
        }

    // Остальные IO-операции (БД, файлы) продолжают работать нормально
    suspend fun saveToCache(bitmap: Bitmap, key: String) =
        withContext(Dispatchers.IO) { // полный IO пул
            cacheDir.resolve(key).outputStream().use { bitmap.compress(PNG, 90, it) }
        }
}

Сценарий 2: rate limiting для API

class ApiClient(private val service: ApiService) {
    // Ограничиваем параллельные запросы к API (rate limiting)
    private val apiDispatcher = Dispatchers.IO.limitedParallelism(5)

    suspend fun batchFetch(ids: List<Long>): List<Item> = coroutineScope {
        ids.map { id ->
            async(apiDispatcher) {
                service.getItem(id) // max 5 параллельных
            }
        }.awaitAll()
    }
}

Сценарий 3: CPU-bound задачи с контролем ресурсов

class VideoProcessor {
    // Не занимаем все ядра — оставляем 1 для UI и системы
    private val cpuCount = Runtime.getRuntime().availableProcessors()
    private val processingDispatcher = Dispatchers.Default.limitedParallelism(
        maxOf(1, cpuCount - 1)
    )

    suspend fun encodeVideos(videos: List<File>): List<File> = coroutineScope {
        videos.map { video ->
            async(processingDispatcher) {
                encode(video)
            }
        }.awaitAll()
    }
}

Сравнение с альтернативами

// Вариант А: limitedParallelism — переиспользует потоки из пула
val limited = Dispatchers.IO.limitedParallelism(4)
// + нет создания новых потоков
// + автоматически управляется JVM
// - не изолирован от других задач на родительском пуле

// Вариант Б: newFixedThreadPoolContext — отдельный пул
val isolated = newFixedThreadPoolContext(4, "my-pool")
// + полная изоляция
// - нужно вызвать close() при завершении
// - создаёт реальные OS-потоки

// Вариант В: Semaphore — явный контроль
val semaphore = Semaphore(4)
launch(Dispatchers.IO) {
    semaphore.withPermit { doWork() }
}
// + максимальная гибкость
// - boilerplate

Подводные камни

  • limitedParallelism на Dispatchers.Main бессмысленен — Main всегда однопоточен; вызов выбросит исключение в некоторых версиях корутин.
  • Диспетчер с ограничением НЕ является singleton автоматически — создавайте его один раз (в companion object или DI), иначе каждый экземпляр класса создаёт отдельный ограничитель.
  • При limitedParallelism(1) создаётся однопоточный последовательный исполнитель — это не то же самое что Dispatchers.Main, это не UI-поток.
  • Родительский пул не знает об ограничении — 10 разных limitedParallelism(5) от Dispatchers.IO теоретически займут все 50 потоков из 64.
  • Тестирование: limitedParallelism не переопределяется через Dispatchers.setMain() — в тестах нужен отдельный TestDispatcher.
  • Kotlin/Native не поддерживает limitedParallelism так же как JVM — поведение может отличаться в KMP-проектах на iOS.
  • При малом значении (например, 1–2) и большой очереди задач — latency вырастает нелинейно; мониторить через метрики времени ожидания.

Common mistakes

  • Отвечать определением без production-сценария.
  • Не называть runtime boundary, security boundary или failure mode.
  • Игнорировать версию API, observability и тестовую проверку.

What the interviewer is testing

  • Объясняет механизм своими словами и без выдуманных API.
  • Называет реальные риски, диагностику и критерий корректности.
  • Связывает ответ с текущей документацией и миграционными ограничениями.

Sources

Related topics