Skip to content

Commit

Permalink
[Divulgence pruning] All divulgence events pruning [DPP-483] (#10634)
Browse files Browse the repository at this point in the history
* All divulgence events pruning
* Add `pruneAllDivulgedContracts` to `IndexParticipantPruningService`
* Add `participant_pruned_all_contracts_up_to_inclusive` in `parameters` table
* `pruneEvents` prunes all divulgence events
* Adapt `ParticipantPruningIT` to check for divulgence with and without `pruna_all_divulged_contracts`
* Create multi-participant pruning test in ledger-on-memory

CHANGELOG_BEGIN
CHANGELOG_END

* CommonStorageBackend
* Enrich pruning descriptions
* Logging for command completions pruning
* Move divulgence pruning back to the beginning of pruning sequence

* Addressed review comments

* PRDivulgenceArchivalPruning adapted for privacy-aware ledgers

* Rebased to main

* Disable ParticipantPruningIT tests targeting append-only schema in mutable schema conformance tests in ledger-on-sql

* Adapted order of SQL migration queries

* Documentation of appendonlydao.JdbcLedgerDao.prune

* Apply suggestions from code review

Co-authored-by: Simon Meier <[email protected]>

Co-authored-by: Simon Meier <[email protected]>
  • Loading branch information
tudor-da and meiersi-da authored Aug 26, 2021
1 parent 0b7980d commit 96ad9b5
Show file tree
Hide file tree
Showing 32 changed files with 389 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,24 @@ final class FutureAssertions[T](future: Future[T]) {
* It doesn't tell us what the value actually was.
*/
def mustFail(context: String)(implicit executionContext: ExecutionContext): Future[Throwable] =
handle(_ => true, context)

/** Checks that the future failed satisfying the predicate and returns the throwable.
* We use this instead of `Future#failed` because the error message that delivers is unhelpful.
* It doesn't tell us what the value actually was.
*/
def mustFailWith(context: String)(
predicate: Throwable => Boolean
)(implicit executionContext: ExecutionContext): Future[Throwable] =
handle(predicate, context)

private def handle(predicate: Throwable => Boolean, context: String)(implicit
executionContext: ExecutionContext
): Future[Throwable] =
future.transform {
case Failure(throwable) =>
Success(throwable)
case Success(value) =>
Failure(new ExpectedFailureException(context, value))
case Failure(throwable) if predicate(throwable) => Success(throwable)
case Success(value) => Failure(new ExpectedFailureException(context, value))
case Failure(other) => Failure(other)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,8 +724,7 @@ private[testtool] final class ParticipantTestContext private[participant] (
def prune(
pruneUpTo: LedgerOffset,
attempts: Int = 10,
// TODO Divulgence pruning: Change default to `true` once all divulgence pruning is implemented
pruneAllDivulgedContracts: Boolean = false,
pruneAllDivulgedContracts: Boolean = true,
): Future[PruneResponse] =
prune(pruneUpTo.getAbsolute, attempts, pruneAllDivulgedContracts)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ class ParticipantPruningIT extends LedgerTestSuite {
)(implicit ec => { case Participants(Participant(participant)) =>
for {
failure <- participant
// TODO Divulgence pruning: Change to `true` once all divulgence pruning is implemented
.prune("", attempts = 1, pruneAllDivulgedContracts = false)
.prune("", attempts = 1, pruneAllDivulgedContracts = true)
.mustFail("pruning without specifying an offset")
} yield {
assertGrpcError(failure, Status.Code.INVALID_ARGUMENT, "prune_up_to not specified")
Expand All @@ -47,8 +46,7 @@ class ParticipantPruningIT extends LedgerTestSuite {
)(implicit ec => { case Participants(Participant(participant)) =>
for {
cannotPruneNonHexOffset <- participant
// TODO Divulgence pruning: Change to `true` once all divulgence pruning is implemented
.prune("covfefe", attempts = 1, pruneAllDivulgedContracts = false)
.prune("covfefe", attempts = 1, pruneAllDivulgedContracts = true)
.mustFail("pruning, specifiying a non-hexadecimal offset")
} yield {
assertGrpcError(
Expand Down Expand Up @@ -531,6 +529,56 @@ class ParticipantPruningIT extends LedgerTestSuite {
}
})

test(
"PRDivulgenceArchivalPruning",
"Prune succeeds for divulgence events whose contracts are archived",
allocate(TwoParties),
runConcurrently = false, // pruning call may interact with other tests
)(implicit ec => { case Participants(Participant(participant, alice, bob)) =>
for {
divulgence <- createDivulgence(alice, bob, participant, participant)
contract <- participant.create(alice, Contract(alice))

// Retroactively divulge Alice's contract to bob
_ <- participant.exercise(
alice,
divulgence.exerciseDivulge(_, contract),
)

// Bob can see the divulged contract
_ <- participant.exerciseAndGetContract[Dummy](
bob,
divulgence.exerciseCanFetch(_, contract),
)

_ <- pruneAtCurrentOffset(participant, bob, pruneAllDivulgedContracts = false)

// Bob can still see the divulged contract
_ <- participant.exerciseAndGetContract[Dummy](
bob,
divulgence.exerciseCanFetch(_, contract),
)

// Archive the divulged contract
_ <- participant.exercise(alice, contract.exerciseArchive)

_ <- pruneAtCurrentOffset(participant, bob, pruneAllDivulgedContracts = false)

_ <- participant
.exerciseAndGetContract[Dummy](
bob,
divulgence.exerciseCanFetch(_, contract),
)
.mustFailWith("Bob cannot access a divulged contract which was already archived") {
exception =>
val errorMessage = exception.getMessage
errorMessage.contains(
"Contract could not be found with id"
) && errorMessage.contains(contract.toString)
}
} yield ()
})

test(
"PRRetroactiveDivulgences",
"Divulgence pruning succeeds",
Expand All @@ -547,7 +595,15 @@ class ParticipantPruningIT extends LedgerTestSuite {
divulgence.exerciseDivulge(_, contract),
)

_ <- divulgencePruneAndCheck(alice, bob, alpha, beta, contract, divulgence)
_ <- divulgencePruneAndCheck(
alice,
bob,
alpha,
beta,
contract,
divulgence,
disclosureVisibility = false,
)
} yield ()
})

Expand All @@ -568,7 +624,15 @@ class ParticipantPruningIT extends LedgerTestSuite {
divulgeNotDiscloseTemplate.exerciseDivulgeNoDisclose(_, divulgence),
)

_ <- divulgencePruneAndCheck(alice, bob, alpha, beta, contract, divulgence)
_ <- divulgencePruneAndCheck(
alice,
bob,
alpha,
beta,
contract,
divulgence,
disclosureVisibility = false,
)
} yield ()
})

Expand All @@ -585,7 +649,15 @@ class ParticipantPruningIT extends LedgerTestSuite {
alice,
divulgence.exerciseCreateAndDisclose,
)
_ <- divulgencePruneAndCheck(alice, bob, alpha, beta, contract, divulgence)
_ <- divulgencePruneAndCheck(
alice,
bob,
alpha,
beta,
contract,
divulgence,
disclosureVisibility = true,
)
} yield ()
})

Expand All @@ -607,6 +679,8 @@ class ParticipantPruningIT extends LedgerTestSuite {
beta: ParticipantTestContext,
contract: Primitive.ContractId[Contract],
divulgence: binding.Primitive.ContractId[Divulgence],
// TODO Divulgence pruning: Remove when immediate divulgence pruning is implemented
disclosureVisibility: Boolean,
)(implicit ec: ExecutionContext) =
for {
// Check that Bob can fetch the contract
Expand All @@ -615,7 +689,7 @@ class ParticipantPruningIT extends LedgerTestSuite {
divulgence.exerciseCanFetch(_, contract),
)

offsetAfter_divulgence_1 <- beta.currentEnd()
offsetAfterDivulgence_1 <- beta.currentEnd()

// Alice re-divulges the contract to Bob
_ <- alpha.exerciseAndGetContract[Contract](
Expand All @@ -628,33 +702,34 @@ class ParticipantPruningIT extends LedgerTestSuite {
bob,
divulgence.exerciseCanFetch(_, contract),
)
offsetAfter_divulgence_2 <- beta.currentEnd()

_ <- beta.prune(offsetAfter_divulgence_1)
_ <- beta.prune(offsetAfterDivulgence_1)
// Check that Bob can still fetch the contract after pruning the first transaction
_ <- beta.exerciseAndGetContract[Dummy](
bob,
divulgence.exerciseCanFetch(_, contract),
)

_ <- beta.prune(offsetAfter_divulgence_2)
// TODO divulgence pruning: Remove - The following assertion should fail once full divulgence pruning
// is implemented in the participant
_ <- beta
.exerciseAndGetContract[Dummy](
bob,
divulgence.exerciseCanFetch(_, contract),
)

// TODO divulgence pruning: Un-comment the assertion below to make tests pass once
// full divulgence pruning is implemented in the participant
//
// _ <- beta
// .exerciseAndGetContract[Dummy](
// bob,
// divulgence.exerciseCanFetch(_, contract),
// )
// .mustFail("Bob cannot access the divulged contract after the second pruning")
_ <- pruneAtCurrentOffset(beta, bob, pruneAllDivulgedContracts = true)

// TODO Divulgence pruning: Check ACS equality before and after pruning
// TODO Divulgence pruning: Remove the true-clause of the if-statement below
// when disclosure pruning is implemented.
_ <-
if (disclosureVisibility) {
beta
.exerciseAndGetContract[Dummy](
bob,
divulgence.exerciseCanFetch(_, contract),
)
} else {
beta
.exerciseAndGetContract[Dummy](
bob,
divulgence.exerciseCanFetch(_, contract),
)
.mustFail("Bob cannot access the divulged contract after the second pruning")
}
} yield ()

private def populateLedgerAndGetOffsets(participant: ParticipantTestContext, submitter: Party)(
Expand Down Expand Up @@ -702,4 +777,16 @@ class ParticipantPruningIT extends LedgerTestSuite {
participant.getTransactionsRequest(parties = Seq(submitter), begin = endOffsetAtTestStart)
)
} yield trees

private def pruneAtCurrentOffset(
participant: ParticipantTestContext,
party: Party,
pruneAllDivulgedContracts: Boolean,
)(implicit ec: ExecutionContext): Future[Unit] =
for {
offset <- participant.currentEnd()
// Dummy needed to prune at this offset
_ <- participant.create(party, Dummy(party))
_ <- participant.prune(offset, pruneAllDivulgedContracts = pruneAllDivulgedContracts)
} yield ()
}
22 changes: 22 additions & 0 deletions ledger/ledger-on-memory/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,26 @@ conformance_test(
],
)

conformance_test(
name = "conformance-test-multi-participant-pruning",
ports = [
6865,
6866,
],
server = ":app",
server_args = [
"--contract-id-seeding=testing-weak",
"--index-append-only-schema",
"--mutable-contract-state-cache",
"--participant=participant-id=example1,port=6865",
"--participant=participant-id=example2,port=6866",
],
test_tool_args = [
"--verbose",
"--include=ParticipantPruningIT",
],
)

conformance_test(
name = "conformance-test-split-participant",
ports = [
Expand Down Expand Up @@ -205,6 +225,8 @@ conformance_test(
test_tool_args = [
"--verbose",
"--include=ParticipantPruningIT",
# Disable tests targeting only append-only schema functionality
"--exclude=ParticipantPruningIT:PRLocalAndNonLocalRetroactiveDivulgences,ParticipantPruningIT:PRRetroactiveDivulgences",
],
)

Expand Down
4 changes: 4 additions & 0 deletions ledger/ledger-on-sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ da_scala_test_suite(
test_tool_args = db.get("conformance_test_tool_args", []) + [
"--verbose",
"--include=ParticipantPruningIT",
# Disable tests targeting only append-only schema functionality
"--exclude=ParticipantPruningIT:PRLocalAndNonLocalRetroactiveDivulgences,ParticipantPruningIT:PRRetroactiveDivulgences",
],
),
conformance_test(
Expand Down Expand Up @@ -479,6 +481,8 @@ conformance_test(
test_tool_args = [
"--verbose",
"--include=ParticipantPruningIT",
# Disable tests targeting only append-only schema functionality
"--exclude=ParticipantPruningIT:PRLocalAndNonLocalRetroactiveDivulgences,ParticipantPruningIT:PRRetroactiveDivulgences",
],
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
219563772b2b7a88b3e70f27a35c94ff7d379380a20276c654fdd3279e7a6e5e
a87ade99f0baf6a646b40606a6d8e254f2b1e2776299dc80929e6dcce489c4c3
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ CREATE TABLE parameters (
participant_id VARCHAR NOT NULL,
ledger_end VARCHAR,
ledger_end_sequential_id BIGINT,
participant_pruned_up_to_inclusive VARCHAR
participant_pruned_up_to_inclusive VARCHAR,
participant_all_divulged_contracts_pruned_up_to_inclusive VARCHAR
);

---------------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
bd2c4bfb724003409c5eafdb9d62df264b7d659f6b3ae9ff5122e57366201f02
0baa5160dbf8f3f5336824de0e8ea123ad3d5101d2fbc850ca4777fc45520b8c
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ CREATE TABLE parameters
ledger_end VARCHAR2(4000),
participant_id NVARCHAR2(1000) not null,
participant_pruned_up_to_inclusive VARCHAR2(4000),
participant_all_divulged_contracts_pruned_up_to_inclusive VARCHAR2(4000),
ledger_end_sequential_id NUMBER
);

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9c8d167a4c2ffa4a57865f217ea9d7997de1d7b90f5ea13c353ad28c70896c4a
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE parameters
ADD COLUMN participant_all_divulged_contracts_pruned_up_to_inclusive VARCHAR;
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,10 @@ private[daml] final class SpannedIndexService(delegate: IndexService) extends In
)(implicit loggingContext: LoggingContext): Future[Unit] =
delegate.stopDeduplicatingCommand(commandId, submitter)

override def prune(pruneUpToInclusive: Offset)(implicit
override def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit
loggingContext: LoggingContext
): Future[Unit] =
delegate.prune(pruneUpToInclusive)
delegate.prune(pruneUpToInclusive, pruneAllDivulgedContracts)

override def currentHealth(): HealthStatus =
delegate.currentHealth()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,12 @@ private[daml] final class TimedIndexService(delegate: IndexService, metrics: Met
)

override def prune(
pruneUpToInclusive: Offset
pruneUpToInclusive: Offset,
pruneAllDivulgedContracts: Boolean,
)(implicit loggingContext: LoggingContext): Future[Unit] =
Timed.future(
metrics.daml.services.index.prune,
delegate.prune(pruneUpToInclusive),
delegate.prune(pruneUpToInclusive, pruneAllDivulgedContracts),
)

override def currentHealth(): HealthStatus =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ final class ApiParticipantPruningService private (
// systems back in sync by reissuing the prune request at the currently specified or later offset.
_ <- pruneWriteService(pruneUpTo, submissionId, request.pruneAllDivulgedContracts)

pruneResponse <- pruneLedgerApiServerIndex(pruneUpTo)
pruneResponse <- pruneLedgerApiServerIndex(
pruneUpTo,
request.pruneAllDivulgedContracts,
)

} yield pruneResponse).andThen(logger.logErrorsOnCall[PruneResponse])
},
Expand Down Expand Up @@ -102,11 +105,12 @@ final class ApiParticipantPruningService private (
}

private def pruneLedgerApiServerIndex(
pruneUpTo: Offset
pruneUpTo: Offset,
pruneAllDivulgedContracts: Boolean,
)(implicit logCtx: LoggingContext): Future[PruneResponse] = {
logger.info(s"About to prune ledger api server index to ${pruneUpTo.toApiString} inclusively")
readBackend
.prune(pruneUpTo)
.prune(pruneUpTo, pruneAllDivulgedContracts)
.map { _ =>
logger.info(s"Pruned ledger api server index up to ${pruneUpTo.toApiString} inclusively.")
PruneResponse()
Expand Down
Loading

0 comments on commit 96ad9b5

Please sign in to comment.