Kotlin Flow in Android

Author Avatar
xjunz 3月 07, 2023
  • 在其它设备中阅读本文章

Kotlin Flow in Android

如果您发现文章中的任何错误或疏漏,欢迎批评指出。

Flow的基本用法

Flow即流,它具有流式地产出多个数据的能力,但是它作为“冷流”,只有在被collect时才会生产数据,并且它只支持单个Collector。如果你愿意,我们也可以把他理解为Kotlin官方复刻的RxJava,用以实现响应式(Reactive)编程。

  • 创建Flow

    在使用flow{}构建Flow时,我们需要初始化一个代码块,也被叫做Producer(生产者)Block,它会在Flow被collect时执行,此blocksuspend修饰,这意味着它会在协程中被执行。

    image-20230225050017470

  • 收集/监听Flow

    image-20230225050246408

  • 使用操作符

    类似于RxJava,Flow也拥有众多的操作符以满足各式的需求:

    image-20230225050129426

  • 感知生命周期

    在Android中使用Flow时,我们应当注意Android组件的生命周期,这可以帮助我们节省系统资源,也可以预防不必要的错误。

    当然,官方已经给我们提供了三种API:

    image-20230225050342748

    官方建议使用LifeCycle.repeatOnLifecycle而不是用lifecycleScope.launch或者lifecycleScope.launchWhenX,因为第一种方法会自动在onStop时停止收集,在onStart重新开始收集,而后两个方法并不会:

    image-20230225050434405

    image-20230225050742719

  • 处理Configuration Changes

    若要面对Configuration Change,你不应该直接在ViewModel内向外暴露一个Flow,因为Flow的Producer块会在每次被Collect的时候执行,这会导致如屏幕旋转发生后,这可能产生意外的后果。这时候,我们需要一个能缓存数据的Flow。

    image-20230225051015183

引入StateFlow

StateFlow类似于LiveData,它能储存数据并且分发给多个Observers/Collectors。而与LiveData不同的是,StateFlow必须拥有初始值,并且LiveData天生具有生命周期感知能力,而StateFlow需要我们手动调用API感知生命周期。

  • 直接使用MutableStateFlow

    image-20230225051509636

  • 使用stateIn将Flow转为StateFlow

    image-20230225051851102

    应当注意 started 参数的含义:它代表当此StateFlow被再次collect时,以何种形式启动上游Flow,官方推荐的做法是 WhileSubscribed(5000)

    想象两个场景:屏幕旋转和用户回到桌面然后返回APP,这两个场景都会导致Android生命周期组件(Activity, Fragment)走onStop(),然后重新走onStart(),这会使StateFlow取消被collect然后重新被collect。对于屏幕旋转这种快速的生命周期重启,我们希望StateFlow的上游Flow不要重新启动以快速完成屏幕旋转。而对于用户回到桌面,我们应当设定一个阈值,如果用户离开的时间超过了此阈值,就暂停上游Flow以节省系统资源。而官方推荐的阈值就是五秒,即StateFlow被取消collect到下次重新被collect之间如果不超过五秒,上游Flow就不需要重新启动。如下图:

    image-20230225055323411

“热流”SharedFlow

所谓“热流”是相对于“冷流”Flow而言的。SharedFlow支持缓存数据、直接启动并且支持多个Collector(这也是它名字的的由来)。上述的StateFlow就是SharedFlow的一个子类。如果要面对更加复杂的需求,我们可以构建自定义的SharedFlow,比如实现单次事件SingleLiveEvent

  • 直接使用MutableSharedFlow

    // Backing property to avoid flow emissions from other classes
    private val _tickFlow = MutableSharedFlow<Unit>(replay = 0)
    val tickFlow: SharedFlow<Event<String>> = _tickFlow
    
    init {
        externalScope.launch {
            while(true) {
                _tickFlow.emit(Unit)
                delay(tickIntervalMs)
            }
        }
    }
    

    SharedFlow 的构造器拥有若干个参数:

    • replay:您可以针对新订阅者重新发送多个之前已发出的值,如果为0,就可以实现单次事件;如果为1就是StateFlow或者LiveData。
    • onBufferOverflow:您可以指定相关策略来处理缓冲区中已存满要发送的数据项的情况(背压)。默认值为 BufferOverflow.SUSPEND,这会使调用方挂起。其他选项包括 DROP_LATESTDROP_OLDEST

    MutableSharedFlow 还具有 subscriptionCount 属性,其中包含处于活跃状态的收集器的数量,以便您相应地优化业务逻辑。MutableSharedFlow 还包含一个 resetReplayCache 函数,供您在不想重放已向数据流发送的最新信息的情况下使用。

  • 使用sharedIn将Flow转为SharedFlow

    class NewsRemoteDataSource(...,
        private val externalScope: CoroutineScope,
    ) {
        val latestNews: Flow<List<ArticleHeadline>> = flow {
            ...
        }.shareIn(
            externalScope,
            replay = 1,
            started = SharingStarted.WhileSubscribed()
        )
    }
    

    在此示例中,latestNews 数据流将上次发出的数据项重放至新收集器,只要 externalScope 处于活跃状态并且存在活跃收集器,它就会一直处于活跃状态。当存在活跃订阅者时,SharingStarted.WhileSubscribed()“启动”政策将使上游提供方保持活跃状态。可使用其他启动政策,例如使用 SharingStarted.Eagerly 可立即启动提供方,使用 SharingStarted.Lazily 可在第一个订阅者出现后开始共享数据,并使数据流永远保持活跃状态。

参考:

https://developer.android.com/kotlin/flow/stateflow-and-sharedflow?hl=zh-cn

https://youtu.be/fSB6_KE95bU

知识共享许可协议
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。

本文链接:http://www.xjunz.top/post/flow-in-android/