Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ordering of patches with cycles during upload #2524

Merged
merged 25 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
05873b1
Ordering of patches with cyclic during upload
aditya-07 Apr 23, 2024
c9e01ea
Ignored failing test
aditya-07 Apr 23, 2024
ed07d2f
Merge branch 'master' into ak/patchOrderWithCycles
aditya-07 May 14, 2024
9354c15
Refactored and added tests
aditya-07 May 16, 2024
fb10a0f
Merge branch 'master' into ak/patchOrderWithCycles
aditya-07 May 16, 2024
6575fae
Refactored
aditya-07 May 16, 2024
9a9f79d
Merge branch 'master' into ak/patchOrderWithCycles
aditya-07 May 27, 2024
471e5ef
Updated code to use strongly connected resources
aditya-07 May 30, 2024
d7524d6
Merge branch 'master' into ak/patchOrderWithCycles
aditya-07 May 30, 2024
e796958
Merge branch 'master' into ak/patchOrderWithCycles
aditya-07 Jun 3, 2024
500e7e8
Review comments: Seperated the code for finding ssc. Added some tets …
aditya-07 Jun 5, 2024
76c4840
Refactored code and added tests.
aditya-07 Jun 6, 2024
d346f0e
Refactored code and added tests.
aditya-07 Jun 6, 2024
4fc3e35
Updated StronglyConnectedPatches to compute node size
aditya-07 Jun 6, 2024
0100dca
Added comments
aditya-07 Jun 6, 2024
b3d87c5
Merge branch 'master' into ak/patchOrderWithCycles
aditya-07 Jun 6, 2024
46fd7b4
Merge branch 'master' into ak/patchOrderWithCycles
aditya-07 Jun 6, 2024
7415b62
Updated tarjan
aditya-07 Jun 10, 2024
190fda2
Merge branch 'master' into ak/patchOrderWithCycles
aditya-07 Jun 10, 2024
31d8098
Review comments: Updated docs
aditya-07 Jun 11, 2024
e2d95fa
Merge branch 'master' into ak/patchOrderWithCycles
aditya-07 Jun 11, 2024
bfcd0c9
Review comments: refactored code and renamed some classes
aditya-07 Jun 13, 2024
39c0173
Refactor
aditya-07 Jun 13, 2024
0de1b7a
Updated the code to use arrays
aditya-07 Jun 13, 2024
e320d00
Fixed failing test
aditya-07 Jun 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import com.google.android.fhir.LocalChange
import com.google.android.fhir.db.Database

/**
* Generates [Patch]es from [LocalChange]s and output [List<[PatchMapping]>] to keep a mapping of
* the [LocalChange]s to their corresponding generated [Patch]
* Generates [Patch]es from [LocalChange]s and output [List<[StronglyConnectedPatchMappings]>] to
* keep a mapping of the [LocalChange]s to their corresponding generated [Patch]
*
* INTERNAL ONLY. This interface should NEVER been exposed as an external API because it works
* together with other components in the upload package to fulfill a specific upload strategy.
Expand All @@ -35,7 +35,7 @@ internal interface PatchGenerator {
* NOTE: different implementations may have requirements on the size of [localChanges] and output
* certain numbers of [Patch]es.
*/
suspend fun generate(localChanges: List<LocalChange>): List<PatchMapping>
suspend fun generate(localChanges: List<LocalChange>): List<StronglyConnectedPatchMappings>
}

internal object PatchGeneratorFactory {
Expand Down Expand Up @@ -67,3 +67,14 @@ internal data class PatchMapping(
val localChanges: List<LocalChange>,
val generatedPatch: Patch,
)

/**
* Structure to describe the cyclic nature of [PatchMapping].
* - A single value in [patchMappings] signifies the acyclic nature of the node.
* - Multiple values in [patchMappings] signifies the cyclic nature of the nodes among themselves.
aditya-07 marked this conversation as resolved.
Show resolved Hide resolved
*
* [StronglyConnectedPatchMappings] is used by the engine to make sure that related resources get
* uploaded to the server in the same request to maintain the referential integrity of resources
* during creation.
*/
internal data class StronglyConnectedPatchMappings(val patchMappings: List<PatchMapping>)
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,26 @@ import androidx.annotation.VisibleForTesting
import com.google.android.fhir.db.Database
import com.google.android.fhir.db.LocalChangeResourceReference

private typealias Node = String
/** Represents a resource e.g. 'Patient/123' , 'Encounter/123'. */
internal typealias Node = String

/**
* Represents a collection of resources with reference to other resource represented as an edge.
* e.g. Two Patient resources p1 and p2, each with an encounter and subsequent observation will be
* represented as follows
*
* ```
* [
* 'Patient/p1' : [],
* 'Patient/p2' : [],
* 'Encounter/e1' : ['Patient/p1'], // Encounter.subject
* 'Encounter/e2' : ['Patient/p2'], // Encounter.subject
* 'Observation/o1' : ['Patient/p1', 'Encounter/e1'], // Observation.subject, Observation.encounter
* 'Observation/o2' : ['Patient/p2', 'Encounter/e2'], // Observation.subject, Observation.encounter
* ]
* ```
*/
internal typealias Graph = Map<Node, List<Node>>

/**
* Orders the [PatchMapping]s to maintain referential integrity during upload.
Expand Down Expand Up @@ -53,15 +72,16 @@ internal object PatchOrdering {
* {D} (UPDATE), then B,C needs to go before the resource A so that referential integrity is
* retained. Order of D shouldn't matter for the purpose of referential integrity.
*
* @return - A ordered list of the [PatchMapping]s based on the references to other [PatchMapping]
* if the mappings are acyclic
* - throws [IllegalStateException] otherwise
* @return A ordered list of the [StronglyConnectedPatchMappings] containing:
* - [StronglyConnectedPatchMappings] with single value for the [PatchMapping] based on the
* references to other [PatchMapping] if the mappings are acyclic
* - [StronglyConnectedPatchMappings] with multiple values for [PatchMapping]s based on the
* references to other [PatchMapping]s if the mappings are cyclic.
*/
suspend fun List<PatchMapping>.orderByReferences(
suspend fun List<PatchMapping>.sccOrderByReferences(
database: Database,
): List<PatchMapping> {
): List<StronglyConnectedPatchMappings> {
val resourceIdToPatchMapping = associateBy { patchMapping -> patchMapping.resourceTypeAndId }

/* Get LocalChangeResourceReferences for all the local changes. A single LocalChange may have
multiple LocalChangeResourceReference, one for each resource reference in the
LocalChange.payload.*/
Expand All @@ -71,7 +91,10 @@ internal object PatchOrdering {
.groupBy { it.localChangeId }

val adjacencyList = createAdjacencyListForCreateReferences(localChangeIdToResourceReferenceMap)
return createTopologicalOrderedList(adjacencyList).mapNotNull { resourceIdToPatchMapping[it] }

return StronglyConnectedPatches.scc(adjacencyList).map {
StronglyConnectedPatchMappings(it.mapNotNull { resourceIdToPatchMapping[it] })
}
}

/**
Expand Down Expand Up @@ -121,22 +144,4 @@ internal object PatchOrdering {
}
return references
}

private fun createTopologicalOrderedList(adjacencyList: Map<Node, List<Node>>): List<Node> {
val stack = ArrayDeque<String>()
val visited = mutableSetOf<String>()
val currentPath = mutableSetOf<String>()

fun dfs(key: String) {
check(currentPath.add(key)) { "Detected a cycle." }
if (visited.add(key)) {
adjacencyList[key]?.forEach { dfs(it) }
stack.addFirst(key)
}
currentPath.remove(key)
}

adjacencyList.keys.forEach { dfs(it) }
return stack.reversed()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,23 @@ import com.google.android.fhir.LocalChange
* maintain an audit trail.
*/
internal object PerChangePatchGenerator : PatchGenerator {
override suspend fun generate(localChanges: List<LocalChange>): List<PatchMapping> =
localChanges.map {
PatchMapping(
localChanges = listOf(it),
generatedPatch =
Patch(
resourceType = it.resourceType,
resourceId = it.resourceId,
versionId = it.versionId,
timestamp = it.timestamp,
type = it.type.toPatchType(),
payload = it.payload,
),
)
}
override suspend fun generate(
localChanges: List<LocalChange>,
): List<StronglyConnectedPatchMappings> =
localChanges
.map {
PatchMapping(
localChanges = listOf(it),
generatedPatch =
Patch(
resourceType = it.resourceType,
resourceId = it.resourceId,
versionId = it.versionId,
timestamp = it.timestamp,
type = it.type.toPatchType(),
payload = it.payload,
),
)
}
.map { StronglyConnectedPatchMappings(listOf(it)) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.github.fge.jsonpatch.JsonPatch
import com.google.android.fhir.LocalChange
import com.google.android.fhir.LocalChange.Type
import com.google.android.fhir.db.Database
import com.google.android.fhir.sync.upload.patch.PatchOrdering.orderByReferences
import com.google.android.fhir.sync.upload.patch.PatchOrdering.sccOrderByReferences

/**
* Generates a [Patch] for all [LocalChange]es made to a single FHIR resource.
Expand All @@ -35,8 +35,10 @@ import com.google.android.fhir.sync.upload.patch.PatchOrdering.orderByReferences
internal class PerResourcePatchGenerator private constructor(val database: Database) :
PatchGenerator {

override suspend fun generate(localChanges: List<LocalChange>): List<PatchMapping> {
return generateSquashedChangesMapping(localChanges).orderByReferences(database)
override suspend fun generate(
localChanges: List<LocalChange>,
): List<StronglyConnectedPatchMappings> {
return generateSquashedChangesMapping(localChanges).sccOrderByReferences(database)
}

@androidx.annotation.VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.android.fhir.sync.upload.patch

import kotlin.math.min

internal object StronglyConnectedPatches {

/**
* Takes a [directedGraph] and computes all the strongly connected components in the graph.
*
* @return An ordered List of strongly connected components of the [directedGraph]. The SCCs are
* topologically ordered which may change based on the ordering algorithm and the [Node]s inside
* a SSC may be ordered randomly depending on the path taken by algorithm to discover the nodes.
*/
fun scc(directedGraph: Graph): List<List<Node>> {
return findSCCWithTarjan(directedGraph)
}

/**
* Finds strongly connected components in topological order. See
* https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm.
*/
private fun findSCCWithTarjan(diGraph: Graph): List<List<Node>> {
val nodeCount = (diGraph.keys + diGraph.values.flatten().toSet()).size
aditya-07 marked this conversation as resolved.
Show resolved Hide resolved
val sccs = mutableListOf<List<Node>>()
val lowLinks = mutableMapOf<Node, Int>()
var exploringCounter = 0
val discoveryTimes = mutableMapOf<Node, Int>()

val visitedNodes = BooleanArray(nodeCount)
val nodesCurrentlyInStack = BooleanArray(nodeCount)
val stack = ArrayDeque<Node>()

fun visited(node: Node) = discoveryTimes[node]?.let { visitedNodes[it] } ?: false

/**
* Discovery time of each Node is unique, starts with 0 and is linearly incremented. Thus, it
* can be used to map (index) each node in an array.
*/
fun Node.discoveryIndex() = discoveryTimes[this] ?: -1

fun dfs(at: Node) {
lowLinks[at] = exploringCounter
discoveryTimes[at] = exploringCounter
visitedNodes[exploringCounter] = true
exploringCounter++
stack.addFirst(at)
nodesCurrentlyInStack[at.discoveryIndex()] = true

diGraph[at]?.forEach {
if (!visited(it)) {
dfs(it)
}

if (nodesCurrentlyInStack[it.discoveryIndex()]) {
lowLinks[at] = min(lowLinks[at]!!, lowLinks[it]!!)
}
}

// We have found the head node in the scc.
if (lowLinks[at] == discoveryTimes[at]) {
val connected = mutableListOf<Node>()
var node: Node
do {
node = stack.removeFirst()
connected.add(node)
nodesCurrentlyInStack[node.discoveryIndex()] = false
} while (node != at && stack.isNotEmpty())
sccs.add(connected.reversed())
}
}

diGraph.keys.forEach {
if (!visited(it)) {
dfs(it)
}
}

return sccs
}
}
aditya-07 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Google LLC
* Copyright 2023-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@ package com.google.android.fhir.sync.upload.request
import com.google.android.fhir.LocalChange
import com.google.android.fhir.sync.upload.patch.Patch
import com.google.android.fhir.sync.upload.patch.PatchMapping
import com.google.android.fhir.sync.upload.patch.StronglyConnectedPatchMappings
import org.hl7.fhir.r4.model.Bundle

/** Generates list of [BundleUploadRequest] of type Transaction [Bundle] from the [Patch]es */
Expand All @@ -29,10 +30,38 @@ internal class TransactionBundleGenerator(
(patch: Patch, useETagForUpload: Boolean) -> BundleEntryComponentGenerator,
) : UploadRequestGenerator {

/**
* In order to accommodate cyclic dependencies between [PatchMapping]s and maintain referential
* integrity on the server, the [PatchMapping]s in a [StronglyConnectedPatchMappings] are all put
* in a single [BundleUploadRequestMapping]. Based on the [generatedBundleSize], the remaining
* space of the [BundleUploadRequestMapping] maybe filled with other
* [StronglyConnectedPatchMappings] mappings.
*
* In case a single [StronglyConnectedPatchMappings] has more [PatchMapping]s than the
* [generatedBundleSize], [generatedBundleSize] will be ignored so that all of the dependent
* mappings in [StronglyConnectedPatchMappings] can be sent in a single [Bundle].
*/
override fun generateUploadRequests(
mappedPatches: List<PatchMapping>,
mappedPatches: List<StronglyConnectedPatchMappings>,
): List<BundleUploadRequestMapping> {
return mappedPatches.chunked(generatedBundleSize).map { patchList ->
val mappingsPerBundle = mutableListOf<List<PatchMapping>>()

var bundle = mutableListOf<PatchMapping>()
mappedPatches.forEach {
if ((bundle.size + it.patchMappings.size) <= generatedBundleSize) {
bundle.addAll(it.patchMappings)
} else {
if (bundle.isNotEmpty()) {
mappingsPerBundle.add(bundle)
bundle = mutableListOf()
}
bundle.addAll(it.patchMappings)
}
}

if (bundle.isNotEmpty()) mappingsPerBundle.add(bundle)

return mappingsPerBundle.map { patchList ->
generateBundleRequest(patchList).let { mappedBundleRequest ->
BundleUploadRequestMapping(
splitLocalChanges = mappedBundleRequest.first,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,20 @@ package com.google.android.fhir.sync.upload.request
import com.google.android.fhir.LocalChange
import com.google.android.fhir.sync.upload.patch.Patch
import com.google.android.fhir.sync.upload.patch.PatchMapping
import com.google.android.fhir.sync.upload.patch.StronglyConnectedPatchMappings
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.codesystems.HttpVerb

/**
* Generator that generates [UploadRequest]s from the [Patch]es present in the
* [List<[PatchMapping]>]. Any implementation of this generator is expected to output
* [List<[UploadRequestMapping]>] which maps [UploadRequest] to the corresponding [LocalChange]s it
* was generated from.
* [List<[StronglyConnectedPatchMappings]>]. Any implementation of this generator is expected to
* output [List<[UploadRequestMapping]>] which maps [UploadRequest] to the corresponding
* [LocalChange]s it was generated from.
*/
internal interface UploadRequestGenerator {
/** Generates a list of [UploadRequestMapping] from the [PatchMapping]s */
fun generateUploadRequests(
mappedPatches: List<PatchMapping>,
mappedPatches: List<StronglyConnectedPatchMappings>,
): List<UploadRequestMapping>
}

Expand Down
Loading
Loading