Что такое backpressure во flows и как Kotlin с этим справляется?
В Kotlin Flow backpressure решён архитектурно: emit() — suspend-функция, поэтому производитель автоматически ждёт потребителя. Для управления скоростью используются buffer(), conflate(), collectLatest(), BufferOverflow.DROP_OLDEST/DROP_LATEST.
Backpressure во Flow
Backpressure — ситуация, когда производитель данных генерирует их быстрее, чем потребитель успевает обработать. В RxJava это была отдельная проблема с несколькими стратегиями. В Kotlin Flow проблема решена архитектурно через механизм suspend.
Встроенная защита через suspension
Kotlin Flow — это cold, последовательный и suspend-based API. По умолчанию производитель не может генерировать следующий элемент, пока потребитель не завершит обработку предыдущего. Это называется sequential backpressure.
val flow = flow {
repeat(1000) { i ->
println("Emitting $i")
emit(i) // suspend до тех пор, пока collect не обработает элемент
}
}
flow.collect { value ->
delay(100) // медленный потребитель — производитель автоматически замедляется
println("Collected $value")
}
Производитель буквально приостанавливается на emit(), ожидая завершения collect. Никакого переполнения буфера нет — это ключевое отличие от callback-based подходов.
Операторы управления backpressure
buffer()
Добавляет буфер между производителем и потребителем. Производитель может опережать потребителя на capacity элементов.
flow.buffer(capacity = 64).collect { value ->
process(value)
}
// По умолчанию capacity = Channel.BUFFERED (64 элемента)
conflate()
Потребитель всегда получает последний эмитированный элемент, промежуточные отбрасываются. Аналог RxJava onBackpressureLatest.
flow { repeat(100) { emit(it); delay(10) } }
.conflate()
.collect { value ->
delay(100) // медленный потребитель
println(value) // увидит только последние значения, пропуская промежуточные
}
collectLatest()
При появлении нового элемента обработка предыдущего отменяется. Подходит для UI — обновлять интерфейс нужно только по последнему состоянию.
stateFlow.collectLatest { state ->
// если придёт новый state, этот блок будет отменён
render(state)
}
debounce() и sample()
// debounce: пропустить элемент, если следующий пришёл раньше timeout
userInput.debounce(300).collect { query -> search(query) }
// sample: брать не более одного элемента за период
sensorFlow.sample(1000).collect { reading -> updateChart(reading) }
Channel.UNLIMITED и DROP стратегии
// UNLIMITED — неограниченный буфер (осторожно с памятью!)
flow.buffer(Channel.UNLIMITED).collect { ... }
// DROP_OLDEST: при переполнении буфера удаляет самые старые элементы
flow.buffer(capacity = 10, onBufferOverflow = BufferOverflow.DROP_OLDEST).collect { ... }
// DROP_LATEST: при переполнении отбрасывает новый элемент
flow.buffer(capacity = 10, onBufferOverflow = BufferOverflow.DROP_LATEST).collect { ... }
SharedFlow и StateFlow
SharedFlow — hot flow с настраиваемым буфером и стратегией при переполнении. StateFlow всегда conflates — хранит только последнее значение.
val sharedFlow = MutableSharedFlow<Event>(
replay = 0,
extraBufferCapacity = 100,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
Подводные камни
- buffer(Channel.UNLIMITED) без мониторинга. Если производитель работает намного быстрее потребителя, неограниченный буфер приведёт к OutOfMemoryError при долгой работе.
- conflate() скрывает пропущенные данные. В финансовых или аудит-приложениях нельзя терять промежуточные значения. conflate() нужно применять только там, где промежуточные состояния действительно не важны.
- collectLatest() отменяет работу. Если блок collectLatest содержит запись в БД или сетевой запрос, отмена может оставить данные в несогласованном состоянии.
- Backpressure отсутствует для SharedFlow с DROP. Если подписчик не успевает читать из SharedFlow с DROP_OLDEST, данные теряются молча — нет никакого исключения или сигнала.
- buffer() запускает upstream в отдельной корутине. Это меняет контекст выполнения и может привести к неожиданному поведению при использовании flowOn вместе с buffer.
- debounce() в тестах. Без TestCoroutineScheduler с виртуальным временем debounce делает тесты медленными или ненадёжными.
Common mistakes
- Объяснять «backpressure во Flow» только как синтаксис и не описывать поведение runtime/compiler.
- Игнорировать важный риск: Без явной стратегии для быстрых событий можно либо переполнить буфер, либо потерять важные промежуточные состояния.
- Давать пример без edge case: отмены, null, recomposition, platform boundary или ошибки.
What the interviewer is testing
- Формулирует суть темы «backpressure во Flow» своими словами и связывает ее с кодом.
- Называет механизм: По умолчанию emit suspend-ится, пока collector не обработает значение, что делает slow consumer видимым в модели.
- Видит production-последствие: Без явной стратегии для быстрых событий можно либо переполнить буфер, либо потерять важные промежуточные состояния.