KotlinMiddleTechnical
Что такое Kotlin Flows и как они соотносятся с RxJava?
Kotlin Flow — coroutine-native cold stream с поддержкой structured cancellation и suspend-операторов; RxJava строится на Observable/Flowable со своими schedulers. Flow проще интегрируется с корутинами, backpressure решается через suspension, buffer и conflate.
Kotlin Flow: основы
Flow — это cold stream: код внутри flow { } не выполняется до вызова терминального оператора (collect, toList, first). Каждый подписчик получает независимую последовательность.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun userUpdates(userId: Long): Flow<String> = flow {
repeat(5) { i ->
delay(200) // suspend — не блокирует поток
emit("update-$i") // отправка элемента
}
}
fun main() = runBlocking {
userUpdates(1L)
.filter { it.endsWith("2") || it.endsWith("4") }
.collect { println(it) }
// update-2
// update-4
}
Hot streams: SharedFlow и StateFlow
StateFlow — аналог BehaviorSubject в RxJava: хранит последнее значение и немедленно отдаёт его новым подписчикам. SharedFlow — аналог PublishSubject: без хранения, но с настраиваемым replay.
val _uiState = MutableStateFlow(UiState.Loading)
val uiState: StateFlow<UiState> = _uiState.asStateFlow()
// В ViewModel:
_uiState.value = UiState.Success(data)
// В UI (Android / Compose):
viewLifecycleOwner.lifecycleScope.launch {
uiState.collect { state -> render(state) }
}
Backpressure: buffer, conflate, collectLatest
buffer(capacity)— буферизует элементы, producer и consumer работают в разных корутинах.conflate()— пропускает промежуточные значения, если consumer не успевает.collectLatest { }— отменяет обработку предыдущего элемента при приходе нового.
tickerFlow(50)
.conflate()
.collect { tick ->
heavyRender(tick) // может занимать 200ms — промежуточные тики пропускаются
}
Сравнение с RxJava
| Аспект | Kotlin Flow | RxJava 3 |
|---|---|---|
| Потоки | CoroutineDispatcher (IO, Default, Main) | Scheduler (io(), computation(), mainThread()) |
| Cold stream | Flow | Observable / Flowable |
| Hot stream | StateFlow / SharedFlow | BehaviorSubject / PublishSubject |
| Backpressure | Suspension + buffer/conflate | Flowable + BackpressureStrategy |
| Отмена | Structured cancellation (scope) | Disposable.dispose() |
| Ошибки | catch { } оператор / try-catch в suspend | onError callback |
Переход с RxJava на Flow
// Адаптер rxjava3-coroutines
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-rx3:1.9.0")
}
// Observable -> Flow
val flow: Flow<T> = observable.asFlow()
// Flow -> Observable
val observable: Observable<T> = flow.asObservable()
Подводные камни
- Утечка корутин при collect вне lifecycleScope. Вызов
flow.collectвGlobalScopeили без привязки к lifecycle приведёт к утечке после уничтожения Activity/Fragment. - Механическая замена Observable на Flow. Если в Rx-цепочке были stateful операторы (scan, window, groupBy) — у них разные семантики в Flow; нужна переработка логики.
- StateFlow не триггерит emit при одинаковом значении.
stateFlow.value = sameValueничего не испускает — в отличие отMutableSharedFlow. - flowOn меняет только upstream.
flowOn(Dispatchers.IO)влияет только на код выше по цепочке, но не наcollect. - Exception transparency. Бросать исключение внутри
flow { }и ловить его снаружи — нормально, но нарушение контракта (бросок внутри оператораcatch) приведёт к непредсказуемому поведению. - Отсутствие retry в RxJava-стиле.
retry { }в Flow есть, ноretryWhenтребует иного подхода сFlow<Throwable>. - SharedFlow replay и replay cache. При неправильно заданном
replayновые подписчики могут получить устаревшие события или не получить ничего. - Многопоточная эмиссия. Эмитить из нескольких корутин в один
flow { }нельзя; для этого используйтеchannelFlow { }илиMutableSharedFlowсemit(которое suspend-safe).
Common mistakes
- Объяснять «Kotlin Flow и RxJava» только как синтаксис и не описывать поведение runtime/compiler.
- Игнорировать важный риск: Механическая миграция Rx цепочек без пересмотра cancellation и lifecycle часто оставляет лишние подписки.
- Давать пример без edge case: отмены, null, recomposition, platform boundary или ошибки.
What the interviewer is testing
- Формулирует суть темы «Kotlin Flow и RxJava» своими словами и связывает ее с кодом.
- Называет механизм: RxJava строится вокруг Observable/Single/Flowable и schedulers; Flow интегрируется с CoroutineContext, backpressure решается suspension, buffer и conflate.
- Видит production-последствие: Механическая миграция Rx цепочек без пересмотра cancellation и lifecycle часто оставляет лишние подписки.