Skip to content

skydoves/flow-operators

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

17 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Flow Operators


License API Build Status Profile Profile


🌊 Flow operators enable you to create restartable, pausable, or one-shot StateFlow, and they support KMP.

Flow Operators

Flow operators that enable you to create restartable, pausable, or one-shot StateFlow instances, allowing you to customize and control additional behaviors for StateFlow based on your specific use case. Why are flow operators useful? You can manage state and control data streams efficiently in complex scenarios. You can read more about the reasons in Loading Initial Data on Android Part 2: Clear All Your Doubts.

Maven Central

Version Catalog

If you're using Version Catalog, you can configure the dependency by adding it to your libs.versions.toml file as follows:

[versions]
#...
flowOperators = "0.1.1"

[libraries]
#...
flow-operators = { module = "com.github.skydoves:flow-operators", version.ref = "flowOperators" }

Gradle

Add the dependency below to your module's build.gradle.kts file:

dependencies {
    implementation("com.github.skydoves:flow-operators:$version")
    
    // if you're using Version Catalog
    implementation(libs.flow.operators)
}

For Kotlin Multiplatform, add the dependency below to your module's build.gradle.kts file:

sourceSets {
    val commonMain by getting {
        dependencies {
            implementation("com.github.skydoves:flow-operators:$version")
        }
    }
}

RestartableStateFlow

RestartableStateFlow extends both StateFlow and Restartable, allowing the upstream flow to restart its emission. It behaves like a regular StateFlow but includes the ability to reset and restart the emission process when necessary.

Consider a scenario where you load initial data using a property, as shown in the example below, instead of initiating the load inside LaunchedEffect or ViewModel.init(). (For more details, check out Loading Initial Data in LaunchedEffect vs. ViewModel).

val posters: StateFlow<List<Poster>> = mainRepository.fetchPostersFlow()
.filter { it.isSuccess }
.mapLatest { it.getOrThrow() }
.restartableStateIn(
    scope = viewModelScope,
    started = SharingStarted.WhileSubscribed(5000),
    initialValue = emptyList(),
)

By using a property, the data is loaded only when the first subscription occurs, preventing the unnecessary immediate execution of tasks and avoiding unintended side effects that might arise from ViewModel.init() or LaunchedEffect. However, another challenge arises: you may need to reload the data due to scenarios like refreshing, recovering from errors during the initial load, or other reasons. In such cases, you can seamlessly restart the upstream flow using RestartableStateFlow.

class MainViewModel(mainRepository: MainRepository) : ViewModel() {

  private val restartablePoster: RestartableStateFlow<List<Poster>> = mainRepository.fetchPostersFlow()
    .filter { it.isSuccess }
    .mapLatest { it.getOrThrow() }
    .restartableStateIn(
      scope = viewModelScope,
      started = SharingStarted.WhileSubscribed(5000),
      initialValue = emptyList(),
    )

  val posters: StateFlow<List<Poster>> = restartablePoster // don't expose the Restartable interface to the outside

  fun refresh() = restartablePoster.restart()
}

Now, you can easily restart the upstream flow and reload the initial task using the property.

@Composable
private fun Main(mainViewModel: MainViewModel) {
  val posters by mainViewModel.posters.collectAsStateWithLifecycle()

  Column(
    modifier = Modifier
      .fillMaxSize()
      .padding(6.dp)
      .verticalScroll(rememberScrollState()),
  ) {
    Button(onClick = { mainViewModel.refresh() }) {
      Text(text = "restart")
    }

    Text(text = posters.toString())
  }
}

PausableStateFlow

PausableStateFlow extends both StateFlow and PausableStateFlow, enabling the upstream flow to pause and resume its emission. It retains the functionality of a standard StateFlow while adding controls for pausing and resuming emissions as needed.

The core concept of PausableStateFlow is similar to RestartableStateFlow, but with an added capability: it allows you to pause and resume listening to the upstream flow. This can be particularly useful in scenarios where you want to temporarily stop processing updates from an upstream flow, such as real-time location updates, Bluetooth connection status, animations, or other continuous events.

While paused, any new subscribers to the PausableStateFlow will simply receive the latest cached value instead of actively listening to the upstream emissions.

class MainViewModel(mainRepository: MainRepository) : ViewModel() {
  
  private val pausableStateFlow: PausableStateFlow<List<Poster>> =
    mainRepository.fetchPostersFlow()
      .filter { it.isSuccess }
      .mapLatest { it.getOrThrow() }
      .pausableStateIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(5000),
        initialValue = emptyList(),
      )

  val posters: StateFlow<List<Poster>> = pausableStateFlow

  fun pause() = pausableStateFlow.pause()
  
  fun resume() = pausableStateFlow.resume()
}

OnetimeWhileSubscribed

OnetimeWhileSubscribed is a SharingStarted strategy that ensures the upstream flow emits only once while a subscriber is active. After the initial emission, it remains idle until another active subscription appears.

When converting a cold flow into a hot flow using stateIn on Android, a common approach is to use SharingStarted.WhileSubscribed(5_000) with the stateIn function. The 5-second threshold (5_000) aligns with the ANR (Application Not Responding) timeout limit. If no subscribers remain for longer than 5 seconds, the timeout is exceeded, and the upstream data flow ceases to influence your UI layer.

However, this can lead to another side-effect: if you navigate from Screen A to Screen B and remain on Screen B for over 5 seconds, returning to Screen A will restart the upstream flow. This causes the same task to relaunch, even if it was already completed. To avoid this, you can use OnetimeWhileSubscribed as a SharingStarted strategy. It ensures the upstream flow is launched only once when the first subscription occurs, and subsequently, only the latest cached value is replayed, avoiding redundant task restarts during screen transitions.

class MainViewModel(mainRepository: MainRepository) : ViewModel() {
  
  private val posters: StateFlow<List<Poster>> =
    mainRepository.fetchPostersFlow()
      .filter { it.isSuccess }
      .mapLatest { it.getOrThrow() }
      .stateIn(
        scope = viewModelScope,
        started = SharingStarted.OnetimeWhileSubscribed(5_000),
        initialValue = emptyList(),
      )
}

Find this repository useful? ❀️

Support it by joining stargazers for this repository. ⭐
Also, follow me on GitHub for my next creations! 🀩

License

Designed and developed by 2025 skydoves (Jaewoong Eum)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.