Kotlin CoroutinesSeniorTechnical

Что такое backpressure во flows и как Kotlin с этим справляется?

В Kotlin Flow backpressure решён архитектурно: emit() — suspend-функция, поэтому производитель автоматически ждёт потребителя. Для управления скоростью используются buffer(), conflate(), collectLatest(), BufferOverflow.DROP_OLDEST/DROP_LATEST.

Backpressure во Flow

Backpressure — ситуация, когда производитель данных генерирует их быстрее, чем потребитель успевает обработать. В RxJava это была отдельная проблема с несколькими стратегиями. В Kotlin Flow проблема решена архитектурно через механизм suspend.

Встроенная защита через suspension

Kotlin Flow — это cold, последовательный и suspend-based API. По умолчанию производитель не может генерировать следующий элемент, пока потребитель не завершит обработку предыдущего. Это называется sequential backpressure.

val flow = flow {
    repeat(1000) { i ->
        println("Emitting $i")
        emit(i)   // suspend до тех пор, пока collect не обработает элемент
    }
}

flow.collect { value ->
    delay(100)   // медленный потребитель — производитель автоматически замедляется
    println("Collected $value")
}

Производитель буквально приостанавливается на emit(), ожидая завершения collect. Никакого переполнения буфера нет — это ключевое отличие от callback-based подходов.

Операторы управления backpressure

buffer()

Добавляет буфер между производителем и потребителем. Производитель может опережать потребителя на capacity элементов.

flow.buffer(capacity = 64).collect { value ->
    process(value)
}
// По умолчанию capacity = Channel.BUFFERED (64 элемента)

conflate()

Потребитель всегда получает последний эмитированный элемент, промежуточные отбрасываются. Аналог RxJava onBackpressureLatest.

flow { repeat(100) { emit(it); delay(10) } }
    .conflate()
    .collect { value ->
        delay(100)         // медленный потребитель
        println(value)     // увидит только последние значения, пропуская промежуточные
    }

collectLatest()

При появлении нового элемента обработка предыдущего отменяется. Подходит для UI — обновлять интерфейс нужно только по последнему состоянию.

stateFlow.collectLatest { state ->
    // если придёт новый state, этот блок будет отменён
    render(state)
}

debounce() и sample()

// debounce: пропустить элемент, если следующий пришёл раньше timeout
userInput.debounce(300).collect { query -> search(query) }

// sample: брать не более одного элемента за период
sensorFlow.sample(1000).collect { reading -> updateChart(reading) }

Channel.UNLIMITED и DROP стратегии

// UNLIMITED — неограниченный буфер (осторожно с памятью!)
flow.buffer(Channel.UNLIMITED).collect { ... }

// DROP_OLDEST: при переполнении буфера удаляет самые старые элементы
flow.buffer(capacity = 10, onBufferOverflow = BufferOverflow.DROP_OLDEST).collect { ... }

// DROP_LATEST: при переполнении отбрасывает новый элемент
flow.buffer(capacity = 10, onBufferOverflow = BufferOverflow.DROP_LATEST).collect { ... }

SharedFlow и StateFlow

SharedFlow — hot flow с настраиваемым буфером и стратегией при переполнении. StateFlow всегда conflates — хранит только последнее значение.

val sharedFlow = MutableSharedFlow<Event>(
    replay = 0,
    extraBufferCapacity = 100,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)

Подводные камни

  • buffer(Channel.UNLIMITED) без мониторинга. Если производитель работает намного быстрее потребителя, неограниченный буфер приведёт к OutOfMemoryError при долгой работе.
  • conflate() скрывает пропущенные данные. В финансовых или аудит-приложениях нельзя терять промежуточные значения. conflate() нужно применять только там, где промежуточные состояния действительно не важны.
  • collectLatest() отменяет работу. Если блок collectLatest содержит запись в БД или сетевой запрос, отмена может оставить данные в несогласованном состоянии.
  • Backpressure отсутствует для SharedFlow с DROP. Если подписчик не успевает читать из SharedFlow с DROP_OLDEST, данные теряются молча — нет никакого исключения или сигнала.
  • buffer() запускает upstream в отдельной корутине. Это меняет контекст выполнения и может привести к неожиданному поведению при использовании flowOn вместе с buffer.
  • debounce() в тестах. Без TestCoroutineScheduler с виртуальным временем debounce делает тесты медленными или ненадёжными.

Common mistakes

  • Объяснять «backpressure во Flow» только как синтаксис и не описывать поведение runtime/compiler.
  • Игнорировать важный риск: Без явной стратегии для быстрых событий можно либо переполнить буфер, либо потерять важные промежуточные состояния.
  • Давать пример без edge case: отмены, null, recomposition, platform boundary или ошибки.

What the interviewer is testing

  • Формулирует суть темы «backpressure во Flow» своими словами и связывает ее с кодом.
  • Называет механизм: По умолчанию emit suspend-ится, пока collector не обработает значение, что делает slow consumer видимым в модели.
  • Видит production-последствие: Без явной стратегии для быстрых событий можно либо переполнить буфер, либо потерять важные промежуточные состояния.

Sources

Related topics