ViewModel中StateFlow和SharedFlow单元测试使用详解
作者:linversion
概念简介
StateFlow和SharedFlow都是kotlin中的数据流,官方概念简介如下:
StateFlow:一个状态容器式可观察数据流,可以向其收集器发出当前状态和新状态。是热数据流。
SharedFlow:StateFlow是StateFlow的可配置性极高的泛化数据流(StateFlow继承于SharedFlow)
对于两者的基本使用以及区别,此处不做详解,可以参考官方文档。本文会给出一些关于如何在业务中选择选择合适热流(hot flow)的建议,以及单元测试代码。
StateFlow的一般用法如下图所示:
以读取数据库数据为例,Repository负责从数据库读取相应数据并返回一个flow,在ViewModel收集这个flow中的数据并更新状态(StateFlow),在MVVM模型中,ViewModel中暴露出来的StateFlow应该是UI层中唯一的可信数据来源,注意是唯一,这点跟使用LiveData的时候不同。
我们应该在ViewModel中暴露出热流(StateFlow或者SharedFlow)而不是冷流(Flow)
如果我们如果暴露出的是普通的冷流,会导致每次有新的流收集者时就会触发一次emit,造成资源浪费。所以如果Repository提供的只有简单的冷流怎么办?很简单,将之转换成热流就好了!通常可以采用以下两种方式:
1、还是正常收集冷流,收集到一个数据就往另外构建的StateFlow或SharedFlow发送
2、使用stateIn或shareIn拓展函数转换成热流
既然官方给我们提供了拓展函数,那肯定是直接使用这个方案最好,使用方式如下:
private const val DEFAULT_TIMEOUT = 500L @HiltViewModel class MyViewModel @Inject constructor( userRepository: UserRepository ): ViewModel() { val userFlow: StateFlow<UiState> = userRepository .getUsers() .asResult() // 此处返回Flow<Result<User>> .map { result -> when(result) { is Result.Loading -> UiState.Loading is Result.Success -> UiState.Success(result.data) is Result.Error -> UiState.Error(result.exception) } } .stateIn( scope = viewModelScope, initialValue = UiState.Loading, started = SharingStarted.WhileSubscribed(DEFAULT_TIMEOUT) ) // started参数保证了当配置改变时不会重新触发订阅 }
在一些业务复杂的页面,比如首页,通常会有多个数据来源,也就有多个flow,为了保证单一可靠数据源原则,我们可以使用combine函数将多个flow组成一个flow,然后再使用stateIn函数转换成StateFlow。
shareIn拓展函数使用方式也是类似的,只不过没有初始值initialValue参数,此处不做赘述。
这两者如何选择?
上文说到,我们应该在ViewModel中暴露出热流,现在我们有两个热流-StateFlow和SharedFlow,如何选择?
没什么特定的规则,选择的时候只需要想一下一下问题:
1.我真的需要在特定的时间、位置获取Flow的最新状态吗?
如果不需要,那考虑SharedFlow,比如常用的事件通知功能。
2.我需要重复发射和收集同样的值吗?
如果需要,那考虑SharedFlow,因为StateFlow会忽略连续两次重复的值。
3.当有新的订阅者订阅的时候,我需要发射最近的多个值吗?
如果需要,那考虑SharedFlow,可以配置replay参数。
compose中收集流的方式
关于在UI层收集ViewModel层的热流方式,官方文档已经有介绍,但是没有补充在JetPack Compose中的收集流方式,下面补充一下。
先添加依赖implementation 'androidx.lifecycle:lifecycle-runtime-compose:2.6.0-alpha03'
// 收集StateFlow val uiState by viewModel.userFlow.collectAsStateWithLifecycle() // 收集SharedFlow,区别在于需要赋初始值 val uiState by viewModel.userFlow.collectAsStateWithLifecycle( initialValue = UiState.Loading ) when(uiState) { is UiState.Loading -> TODO() is UiState.Success -> TODO() is UiState.Error -> TODO() }
使用collectAsStateWithLifecycle()也是可以保证流的收集操作之发生在应用位于前台的时候,避免造成资源浪费。
单元测试
由于我们会在ViewModel中使用到viewModelScope,首先可以定义一个MainDispatcherRule,用于设置MainDispatcher。
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.test.TestDispatcher import kotlinx.coroutines.test.UnconfinedTestDispatcher import kotlinx.coroutines.test.resetMain import kotlinx.coroutines.test.setMain import org.junit.rules.TestRule import org.junit.rules.TestWatcher import org.junit.runner.Description /** * A JUnit [TestRule] that sets the Main dispatcher to [testDispatcher] * for the duration of the test. */ class MainDispatcherRule( val testDispatcher: TestDispatcher = UnconfinedTestDispatcher() ) : TestWatcher() { override fun starting(description: Description) { super.starting(description) Dispatchers.setMain(testDispatcher) } override fun finished(description: Description) { super.finished(description) Dispatchers.resetMain() } }
将MainDispatcherRule用于ViewModel单元测试代码中:
class MyViewModelTest { @get:Rule val mainDispatcherRule = MainDispatcherRule() ... }
1.测试StateFlow
现在我们有一个业务ViewModel如下:
@HiltViewModel class MyViewModel @Inject constructor( private val userRepository: UserRepository ) : ViewModel() { private val _userFlow = MutableStateFlow<UiState>(UiState.Loading) val userFlow: StateFlow<UiState> = _userFlow.asStateFlow() fun onRefresh() { viewModelScope.launch { userRepository .getUsers().asResult() .collect { result -> _userFlow.update { when (result) { is Result.Loading -> UiState.Loading is Result.Success -> UiState.Success(result.data) is Result.Error -> UiState.Error(result.exception) } } } } } }
单元测试代码如下:
class MyViewModelTest{ @get:Rule val mainDispatcherRule = MainDispatcherRule() // arrange private val repository = TestUserRepository() @OptIn(ExperimentalCoroutinesApi::class) @Test fun `when initialized, repository emits loading and data`() = runTest { // arrange val viewModel = MyViewModel(repository) val users = listOf(...) // 初始值应该是UiState.Loading,因为stateFlow可以直接获取最新值,此处直接做断言 assertEquals(UiState.Loading, viewModel.userFlow.value) // action repository.sendUsers(users) viewModel.onRefresh() //check assertEquals(UiState.Success(users), viewModel.userFlow.value) } } // Mock UserRepository class TestUserRepository : UserRepository { /** * The backing hot flow for the list of users for testing. */ private val usersFlow = MutableSharedFlow<List<User>>(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) override fun getUsers(): Flow<List<User>> { return usersFlow } /** * A test-only API to allow controlling the list of users from tests. */ suspend fun sendUsers(users: List<User>) { usersFlow.emit(users) } }
如果ViewModel中使用的是stateIn拓展函数:
@OptIn(ExperimentalCoroutinesApi::class) @Test fun `when initialized, repository emits loading and data`() = runTest { //arrange val viewModel = MainWithStateinViewModel(repository) val users = listOf(...) //action // 因为此时collect操作并不是在ViewModel中,我们需要在测试代码中执行collect val collectJob = launch(UnconfinedTestDispatcher(testScheduler)) { viewModel.userFlow.collect() } //check assertEquals(UiState.Loading, viewModel.userFlow.value) //action repository.sendUsers(users) //check assertEquals(UiState.Success(users), viewModel.userFlow.value) collectJob.cancel() }
2.测试SharedFlow
测试SharedFlow可以使用一个开源库Turbine,Turbine是一个用于测试Flow的小型开源库。
测试使用sharedIn拓展函数的SharedFlow:
@OptIn(ExperimentalCoroutinesApi::class) @Test fun `when initialized, repository emits loading and data`() = runTest { val viewModel = MainWithShareInViewModel(repository) val users = listOf(...) repository.sendUsers(users) viewModel.userFlow.test { val firstItem = awaitItem() assertEquals(UiState.Loading, firstItem) val secondItem = awaitItem() assertEquals(UiState.Success(users), secondItem) } }
以上就是ViewModel中StateFlow和SharedFlow单元测试使用详解的详细内容,更多关于ViewModel StateFlow SharedFlow测试的资料请关注脚本之家其它相关文章!