Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(sync): enable concurrent sync start/stop [WPB-15262] #3304

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import com.wire.kalium.logic.feature.publicuser.GetAllContactsResult
import kotlinx.coroutines.flow.first

suspend fun getConversations(userSession: UserSessionScope): List<Conversation> {
userSession.syncManager.waitUntilLive()
userSession.syncExecutor.request {
waitUntilLiveOrFailure()
}

val conversations = userSession.conversations.getConversations().let {
when (it) {
Expand All @@ -53,7 +55,9 @@ suspend fun UserSessionScope.listConversations(): List<Conversation> {
}

suspend fun UserSessionScope.selectConversation(): Conversation {
syncManager.waitUntilLive()
syncExecutor.request {
waitUntilLiveOrFailure()
}

val conversations = listConversations()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.wire.kalium.logic.feature.auth.autoVersioningAuth.AutoVersionAuthScop
import com.wire.kalium.logic.feature.client.RegisterClientResult
import com.wire.kalium.logic.feature.client.RegisterClientUseCase
import com.wire.kalium.logic.feature.server.GetServerConfigResult
import com.wire.kalium.util.DelicateKaliumApi
import kotlinx.coroutines.runBlocking

class LoginCommand : CliktCommand(name = "login") {
Expand Down Expand Up @@ -110,6 +111,7 @@ class LoginCommand : CliktCommand(name = "login") {
}
}

@OptIn(DelicateKaliumApi::class)
override fun run(): Unit = runBlocking {
val loginResult = authenticate().let {
if (it !is AuthenticationResult.Success) {
Expand All @@ -132,6 +134,10 @@ class LoginCommand : CliktCommand(name = "login") {
}
}

userSession = currentContext.findOrSetObject { coreLogic.getSessionScope(userId) }
userSession = currentContext.findOrSetObject {
coreLogic.getSessionScope(userId).also {
it.syncExecutor.request { keepSyncAlwaysOn() }
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ class UpdateSupportedProtocolsCommand : CliktCommand(name = "update-supported-pr
private val userSession by requireObject<UserSessionScope>()

override fun run() = runBlocking {
userSession.syncManager.waitUntilLive()
userSession.syncExecutor.request {
waitUntilLiveOrFailure()
}
userSession.users.updateSupportedProtocols().fold({ failure ->
throw PrintMessage("updating supported protocols failed: $failure")
}, {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,9 @@ internal class MLSConversationDataSource(

private suspend fun keepCommitAndRetry(groupID: GroupID): Either<CoreFailure, Unit> {
kaliumLogger.w("Migrating failed commit to new epoch and re-trying.")

// FIXME: Sync Cyclic Dependency.
// This function can be called DURING sync. And at the same time, it waits for Sync.
// Perhaps it should be scheduled for a retry in the future, after Sync is done.
return syncManager.waitUntilLiveOrFailure().flatMap {
commitPendingProposalsWithoutRetry(groupID)
}
Expand All @@ -878,6 +880,9 @@ internal class MLSConversationDataSource(
wrapMLSRequest {
mlsClient.clearPendingCommit(idMapper.toCryptoModel(groupID))
}.flatMap {
// FIXME: Sync Cyclic Dependency.
// This function can be called DURING sync. And at the same time, it waits for Sync.
// Perhaps it should be scheduled for a retry in the future, after Sync is done.
syncManager.waitUntilLiveOrFailure()
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,7 @@ internal interface IncrementalSyncRepository {
*/
val incrementalSyncState: Flow<IncrementalSyncStatus>

/**
* Buffered flow of [ConnectionPolicy].
* - Has a replay size of 1, so the latest
* value is always immediately available for new observers.
* - Doesn't emit repeated values.
* - It has a limited buffer of size [BUFFER_SIZE]
* that will drop the oldest values if the buffer is full
* to prevent emissions from being suspended due to slow
* collectors.
* @see [BufferOverflow]
*/
val connectionPolicyState: Flow<ConnectionPolicy>
suspend fun updateIncrementalSyncState(newState: IncrementalSyncStatus)
suspend fun setConnectionPolicy(connectionPolicy: ConnectionPolicy)

companion object {
// The same default buffer size used by Coroutines channels
Expand All @@ -80,28 +67,13 @@ internal class InMemoryIncrementalSyncRepository(
.asSharedFlow()
.distinctUntilChanged()

private val _connectionPolicy = MutableSharedFlow<ConnectionPolicy>(
replay = 1,
extraBufferCapacity = BUFFER_SIZE,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)

override val connectionPolicyState = _connectionPolicy
.asSharedFlow()
.distinctUntilChanged()

init {
_syncState.tryEmit(IncrementalSyncStatus.Pending)
_connectionPolicy.tryEmit(ConnectionPolicy.KEEP_ALIVE)
}

override suspend fun updateIncrementalSyncState(newState: IncrementalSyncStatus) {
logger.i("IncrementalSyncStatus Updated FROM:${_syncState.first()}; TO: $newState")
_syncState.emit(newState)
}

override suspend fun setConnectionPolicy(connectionPolicy: ConnectionPolicy) {
logger.i("IncrementalSync Connection Policy changed: $connectionPolicy")
_connectionPolicy.emit(connectionPolicy)
}
}
Loading
Loading