KotlinMiddleTechnical

Что такое Kotlin Flows и как они соотносятся с RxJava?

Kotlin Flow — coroutine-native cold stream с поддержкой structured cancellation и suspend-операторов; RxJava строится на Observable/Flowable со своими schedulers. Flow проще интегрируется с корутинами, backpressure решается через suspension, buffer и conflate.

Kotlin Flow: основы

Flow — это cold stream: код внутри flow { } не выполняется до вызова терминального оператора (collect, toList, first). Каждый подписчик получает независимую последовательность.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun userUpdates(userId: Long): Flow<String> = flow {
    repeat(5) { i ->
        delay(200)          // suspend — не блокирует поток
        emit("update-$i")   // отправка элемента
    }
}

fun main() = runBlocking {
    userUpdates(1L)
        .filter { it.endsWith("2") || it.endsWith("4") }
        .collect { println(it) }
    // update-2
    // update-4
}

Hot streams: SharedFlow и StateFlow

StateFlow — аналог BehaviorSubject в RxJava: хранит последнее значение и немедленно отдаёт его новым подписчикам. SharedFlow — аналог PublishSubject: без хранения, но с настраиваемым replay.

val _uiState = MutableStateFlow(UiState.Loading)
val uiState: StateFlow<UiState> = _uiState.asStateFlow()

// В ViewModel:
_uiState.value = UiState.Success(data)

// В UI (Android / Compose):
viewLifecycleOwner.lifecycleScope.launch {
    uiState.collect { state -> render(state) }
}

Backpressure: buffer, conflate, collectLatest

  • buffer(capacity) — буферизует элементы, producer и consumer работают в разных корутинах.
  • conflate() — пропускает промежуточные значения, если consumer не успевает.
  • collectLatest { } — отменяет обработку предыдущего элемента при приходе нового.
tickerFlow(50)
    .conflate()
    .collect { tick ->
        heavyRender(tick) // может занимать 200ms — промежуточные тики пропускаются
    }

Сравнение с RxJava

АспектKotlin FlowRxJava 3
ПотокиCoroutineDispatcher (IO, Default, Main)Scheduler (io(), computation(), mainThread())
Cold streamFlowObservable / Flowable
Hot streamStateFlow / SharedFlowBehaviorSubject / PublishSubject
BackpressureSuspension + buffer/conflateFlowable + BackpressureStrategy
ОтменаStructured cancellation (scope)Disposable.dispose()
Ошибкиcatch { } оператор / try-catch в suspendonError callback

Переход с RxJava на Flow

// Адаптер rxjava3-coroutines
dependencies {
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-rx3:1.9.0")
}

// Observable -> Flow
val flow: Flow<T> = observable.asFlow()

// Flow -> Observable
val observable: Observable<T> = flow.asObservable()

Подводные камни

  • Утечка корутин при collect вне lifecycleScope. Вызов flow.collect в GlobalScope или без привязки к lifecycle приведёт к утечке после уничтожения Activity/Fragment.
  • Механическая замена Observable на Flow. Если в Rx-цепочке были stateful операторы (scan, window, groupBy) — у них разные семантики в Flow; нужна переработка логики.
  • StateFlow не триггерит emit при одинаковом значении. stateFlow.value = sameValue ничего не испускает — в отличие от MutableSharedFlow.
  • flowOn меняет только upstream. flowOn(Dispatchers.IO) влияет только на код выше по цепочке, но не на collect.
  • Exception transparency. Бросать исключение внутри flow { } и ловить его снаружи — нормально, но нарушение контракта (бросок внутри оператора catch) приведёт к непредсказуемому поведению.
  • Отсутствие retry в RxJava-стиле. retry { } в Flow есть, но retryWhen требует иного подхода с Flow<Throwable>.
  • SharedFlow replay и replay cache. При неправильно заданном replay новые подписчики могут получить устаревшие события или не получить ничего.
  • Многопоточная эмиссия. Эмитить из нескольких корутин в один flow { } нельзя; для этого используйте channelFlow { } или MutableSharedFlow с emit (которое suspend-safe).

Common mistakes

  • Объяснять «Kotlin Flow и RxJava» только как синтаксис и не описывать поведение runtime/compiler.
  • Игнорировать важный риск: Механическая миграция Rx цепочек без пересмотра cancellation и lifecycle часто оставляет лишние подписки.
  • Давать пример без edge case: отмены, null, recomposition, platform boundary или ошибки.

What the interviewer is testing

  • Формулирует суть темы «Kotlin Flow и RxJava» своими словами и связывает ее с кодом.
  • Называет механизм: RxJava строится вокруг Observable/Single/Flowable и schedulers; Flow интегрируется с CoroutineContext, backpressure решается suspension, buffer и conflate.
  • Видит production-последствие: Механическая миграция Rx цепочек без пересмотра cancellation и lifecycle часто оставляет лишние подписки.

Sources

Related topics