Skip to content

Commit

Permalink
Make Index DB enable multiple party additions [DPP-546] (#10623)
Browse files Browse the repository at this point in the history
Preparation, small fixes

* Remove unnecessary TODO
* Fix FieldStrategy.idempotentInsert type parameters

Changing the DB-Schema

This change adapts the DB schema for all supported backends. After this change we only populate the party_entries table, and on the query side we reconstruct the state from this.

* Drop the party table
* Add indexing to party_entries

Adapting StorageBackend: ingestion

Since we only ingest party_entries, the party population needs to be removed.

* Drop the party table in ingestion code
* Fixes test

Adapting StorageBackend: queries

Queries needs to be adapted to construct the state from the read side.

* Rewrite queries.
* Fixes reset implementations.

Adapting JdbcLedgerDao

Since underlying storage changed, JdbcLedgerDao can be simplified: no special treatment needed with duplicate errors, since these errors are impossible to happen.
Removing JdbcLedgerDao tests, and adding a new test, testing the behavior of the new event-source party model. Please note: this database refactoring only applies to the append-only schema, so for the mutating schema the test is disabled.
During implementation a bug surfaced: it was not possible anymore to store the is_local information via JdbcLedgerDao.storePartyEntry. Although this bug is a minor issue, since that method is only used from single participant environment, still a fix was also implemented for this, by passing a magic participantId upon non-local party storage.

* Simplify storePartyEntry.
* Fixes bug introduced by append-only.
* adds/adapts tests

Refactoring: remove not used duplicateKeyError from StorageBackend

Changes to JdbcLedgerDao rendered this duplicateKeyError unused.

* Removes unused duplicateKeyError

Adapting sandbox-classic

In sandbox-classic it is not allowed to have updates for parties. Essentially the updates concerning already existent parties were dropped silently with logging without effect.
Here I started by pinning down this behaviour in the SqlLedgerSpec and SqlLedgerSpecAppendOnly. These tests were implemented with the original code in mind.
Then adapted the SqlLedger method: making sure of uniqueness by first trying to lookup the to-be-persisted party.

* Added tests grabbing a hold on original behavior
* Adapted implementation to ensure same behavior

Switching to correct is_local derivation for party queries as per review

* Adapting implementation: switching to aggregated table and a query on that
* Introducing QueryStrategy.booleanOrAggregationFunction to support Oracle
* Moving party related queries to PartyStorageBackendTemplate
* Fixes JdbcLedgerDaoPartiesSpec tests, and add another test case

Also:

* Align Update interface documentation
* Switching to explicit optionality in party query implementation asa per review

Co-authored-by: Simon Meier <[email protected]>

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
nmarton-da authored Aug 24, 2021
1 parent b22de68 commit 640fb68
Show file tree
Hide file tree
Showing 34 changed files with 547 additions and 342 deletions.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
aae0b43e4735b3ffbdb782a61bad75f741c58a282327f3d3e18b0e41da5c69f6
219563772b2b7a88b3e70f27a35c94ff7d379380a20276c654fdd3279e7a6e5e
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,6 @@ CREATE TABLE package_entries (

CREATE INDEX idx_package_entries ON package_entries (submission_id);

---------------------------------------------------------------------------------------------------
-- Parties table
---------------------------------------------------------------------------------------------------
CREATE TABLE parties (
party VARCHAR PRIMARY KEY NOT NULL,
display_name VARCHAR,
explicit BOOLEAN NOT NULL,
ledger_offset VARCHAR,
is_local BOOLEAN NOT NULL
);

CREATE INDEX idx_parties_ledger_offset ON parties (ledger_offset);

---------------------------------------------------------------------------------------------------
-- Party entries table
---------------------------------------------------------------------------------------------------
Expand All @@ -102,6 +89,7 @@ CREATE TABLE party_entries (
);

CREATE INDEX idx_party_entries ON party_entries (submission_id);
CREATE INDEX idx_party_entries_party_and_ledger_offset ON party_entries(party, ledger_offset);

---------------------------------------------------------------------------------------------------
-- Submissions table
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
c9148396eec01471c1135ff384d0b83442442ada1d6ca12d731f8e84b6f4869f
bd2c4bfb724003409c5eafdb9d62df264b7d659f6b3ae9ff5122e57366201f02
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,6 @@
-- reconstructed from the log of create and archive events.
---------------------------------------------------------------------------------------------------

CREATE TABLE parties
(
-- The unique identifier of the party
party NVARCHAR2(1000) primary key not null,
-- A human readable name of the party, might not be unique
display_name NVARCHAR2(1000),
-- True iff the party was added explicitly through an API call
explicit NUMBER(1, 0) not null,
-- For implicitly added parties: the offset of the transaction that introduced the party
-- For explicitly added parties: the ledger end at the time when the party was added
ledger_offset VARCHAR2(4000),
is_local NUMBER(1, 0) not null
);
CREATE INDEX parties_ledger_offset_idx ON parties(ledger_offset);

CREATE TABLE packages
(
-- The unique identifier of the package (the hash of its content)
Expand Down Expand Up @@ -121,6 +106,7 @@ CREATE TABLE party_entries
)
);
CREATE INDEX idx_party_entries ON party_entries(submission_id);
CREATE INDEX idx_party_entries_party_and_ledger_offset ON party_entries(party, ledger_offset);

CREATE TABLE participant_command_completions
(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
726147a3db60eaf4e65a8a761e20caee0b6e89d49b2b08e1c0dfca50da11cd1f
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

DROP TABLE parties;
CREATE INDEX idx_party_entries_party_and_ledger_offset ON party_entries(party, ledger_offset);
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ import com.daml.platform.store.entries.{

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
import scala.util.control.NonFatal

private class JdbcLedgerDao(
dbDispatcher: DbDispatcher,
Expand Down Expand Up @@ -223,40 +221,37 @@ private class JdbcLedgerDao(
}
}

private val NonLocalParticipantId =
Ref.ParticipantId.assertFromString("RESTRICTED_NON_LOCAL_PARTICIPANT_ID")

override def storePartyEntry(
offsetStep: OffsetStep,
partyEntry: PartyLedgerEntry,
)(implicit loggingContext: LoggingContext): Future[PersistenceResponse] = {
logger.info("Storing party entry")
dbDispatcher.executeSql(metrics.daml.index.db.storePartyEntryDbMetrics) { implicit conn =>
val savepoint = conn.setSavepoint()
val offset = validateOffsetStep(offsetStep, conn)
partyEntry match {
case PartyLedgerEntry.AllocationAccepted(submissionIdOpt, recordTime, partyDetails) =>
Try({
sequentialIndexer.store(
conn,
offset,
Some(
state.Update.PartyAddedToParticipant(
party = partyDetails.party,
displayName = partyDetails.displayName.orNull,
participantId = participantId,
recordTime = Time.Timestamp.assertFromInstant(recordTime),
submissionId = submissionIdOpt,
)
),
)
PersistenceResponse.Ok
}).recover {
case NonFatal(e) if e.getMessage.contains(storageBackend.duplicateKeyError) =>
logger.warn(
s"Ignoring duplicate party submission with ID ${partyDetails.party} for submissionId $submissionIdOpt"
sequentialIndexer.store(
conn,
offset,
Some(
state.Update.PartyAddedToParticipant(
party = partyDetails.party,
displayName = partyDetails.displayName.orNull,
// HACK: the `PartyAddedToParticipant` transmits `participantId`s, while here we only have the information
// whether the party is locally hosted or not. We use the `nonLocalParticipantId` to get the desired effect of
// the `isLocal = False` information to be transmitted via a `PartyAddedToParticpant` `Update`.
//
// This will be properly resolved once we move away from the `sandbox-classic` codebase.
participantId = if (partyDetails.isLocal) participantId else NonLocalParticipantId,
recordTime = Time.Timestamp.assertFromInstant(recordTime),
submissionId = submissionIdOpt,
)
conn.rollback(savepoint)
sequentialIndexer.store(conn, offset, None)
PersistenceResponse.Duplicate
}.get
),
)
PersistenceResponse.Ok

case PartyLedgerEntry.AllocationRejected(submissionId, recordTime, reason) =>
sequentialIndexer.store(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,6 @@ object DbDto {
is_local: Option[Boolean],
) extends DbDto

final case class Party(
party: String,
display_name: Option[String],
explicit: Boolean,
ledger_offset: Option[String],
is_local: Boolean,
) extends DbDto

final case class CommandCompletion(
completion_offset: String,
record_time: Instant,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ trait StorageBackend[DB_BATCH]
* The result is a database that looks the same as a freshly created database with Flyway migrations applied.
*/
def resetAll(connection: Connection): Unit
def duplicateKeyError: String // TODO: Avoid brittleness of error message checks
}

trait IngestionStorageBackend[DB_BATCH] {
Expand Down Expand Up @@ -229,7 +228,6 @@ trait ContractStorageBackend {
): Vector[StorageBackend.RawContractStateEvent]
}

// TODO append-only: Event related query consolidation
trait EventStorageBackend {

/** Part of pruning process, this needs to be in the same transaction as the other pruning related database operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,7 @@ object UpdateToDbDto {
typ = JdbcLedgerDao.acceptType,
rejection_reason = None,
is_local = Some(u.participantId == participantId),
),
DbDto.Party(
party = u.party,
display_name = Some(u.displayName),
explicit = true,
ledger_offset = Some(offset.toHexString),
is_local = u.participantId == participantId,
),
)
)

case u: PartyAllocationRejected =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import java.sql.Connection
import java.time.Instant
import java.util.Date

import anorm.SqlParser.{array, binaryStream, bool, byteArray, date, flatten, int, long, str}
import anorm.{Macro, RowParser, SQL, SqlParser, SqlQuery, SqlStringInterpolation, ~}
import com.daml.ledger.api.domain.{LedgerId, ParticipantId, PartyDetails}
import anorm.SqlParser.{array, binaryStream, byteArray, date, flatten, int, long, str}
import anorm.{Macro, RowParser, SQL, SqlParser, SqlQuery, ~}
import com.daml.ledger.api.domain.{LedgerId, ParticipantId}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.PackageDetails
Expand All @@ -20,9 +20,7 @@ import com.daml.platform.store.Conversions.{
instant,
ledgerString,
offset,
party,
}
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.PackageId
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.common.MismatchException
Expand All @@ -31,7 +29,8 @@ import com.daml.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf
import com.daml.platform.store.appendonlydao.JdbcLedgerDao.{acceptType, rejectType}
import com.daml.platform.store.backend.{ParameterStorageBackend, StorageBackend}
import com.daml.platform.store.backend.StorageBackend.RawTransactionEvent
import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry}
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry}
import com.daml.scalautil.Statement.discard
import scalaz.syntax.tag._

Expand All @@ -53,7 +52,6 @@ private[backend] trait CommonStorageBackend[DB_BATCH] extends StorageBackend[DB_
SQL(
"DELETE FROM participant_events_non_consuming_exercise WHERE event_offset > {ledger_offset}"
),
SQL("DELETE FROM parties WHERE ledger_offset > {ledger_offset}"),
SQL("DELETE FROM party_entries WHERE ledger_offset > {ledger_offset}"),
)

Expand Down Expand Up @@ -308,127 +306,6 @@ private[backend] trait CommonStorageBackend[DB_BATCH] extends StorageBackend[DB_
.asVectorOf(configurationEntryParser)(connection)
}

// Parties

private val SQL_GET_PARTY_ENTRIES = SQL(
"""select * from party_entries
|where ({startExclusive} is null or ledger_offset > {startExclusive}) and ledger_offset <= {endInclusive}
|order by ledger_offset asc
|offset {queryOffset} rows
|fetch next {pageSize} rows only""".stripMargin
)

private val partyEntryParser: RowParser[(Offset, PartyLedgerEntry)] = {
import com.daml.platform.store.Conversions.bigDecimalColumnToBoolean
(offset("ledger_offset") ~
date("recorded_at") ~
ledgerString("submission_id").? ~
party("party").? ~
str("display_name").? ~
str("typ") ~
str("rejection_reason").? ~
bool("is_local").?)
.map(flatten)
.map {
case (
offset,
recordTime,
submissionIdOpt,
Some(party),
displayNameOpt,
`acceptType`,
None,
Some(isLocal),
) =>
offset ->
PartyLedgerEntry.AllocationAccepted(
submissionIdOpt,
recordTime.toInstant,
PartyDetails(party, displayNameOpt, isLocal),
)
case (
offset,
recordTime,
Some(submissionId),
None,
None,
`rejectType`,
Some(reason),
None,
) =>
offset -> PartyLedgerEntry.AllocationRejected(
submissionId,
recordTime.toInstant,
reason,
)
case invalidRow =>
sys.error(s"getPartyEntries: invalid party entry row: $invalidRow")
}
}

def partyEntries(
startExclusive: Offset,
endInclusive: Offset,
pageSize: Int,
queryOffset: Long,
)(connection: Connection): Vector[(Offset, PartyLedgerEntry)] = {
import com.daml.platform.store.Conversions.OffsetToStatement
SQL_GET_PARTY_ENTRIES
.on(
"startExclusive" -> startExclusive,
"endInclusive" -> endInclusive,
"pageSize" -> pageSize,
"queryOffset" -> queryOffset,
)
.asVectorOf(partyEntryParser)(connection)
}

private val SQL_SELECT_MULTIPLE_PARTIES =
SQL(
"select parties.party, parties.display_name, parties.ledger_offset, parties.explicit, parties.is_local from parties, parameters where party in ({parties}) and parties.ledger_offset <= parameters.ledger_end"
)

private case class ParsedPartyData(
party: String,
displayName: Option[String],
ledgerOffset: Offset,
explicit: Boolean,
isLocal: Boolean,
)

private val PartyDataParser: RowParser[ParsedPartyData] = {
import com.daml.platform.store.Conversions.columnToOffset
import com.daml.platform.store.Conversions.bigDecimalColumnToBoolean
Macro.parser[ParsedPartyData](
"party",
"display_name",
"ledger_offset",
"explicit",
"is_local",
)
}

private def constructPartyDetails(data: ParsedPartyData): PartyDetails =
PartyDetails(Ref.Party.assertFromString(data.party), data.displayName, data.isLocal)

def parties(parties: Seq[Ref.Party])(connection: Connection): List[PartyDetails] = {
import com.daml.platform.store.Conversions.partyToStatement
SQL_SELECT_MULTIPLE_PARTIES
.on("parties" -> parties)
.as(PartyDataParser.*)(connection)
.map(constructPartyDetails)
}

private val SQL_SELECT_ALL_PARTIES =
SQL(
"select parties.party, parties.display_name, parties.ledger_offset, parties.explicit, parties.is_local from parties, parameters where parameters.ledger_end >= parties.ledger_offset"
)

def knownParties(connection: Connection): List[PartyDetails] =
SQL_SELECT_ALL_PARTIES
.as(PartyDataParser.*)(connection)
.map(constructPartyDetails)

// Packages

private val SQL_SELECT_PACKAGES =
Expand Down Expand Up @@ -484,8 +361,8 @@ private[backend] trait CommonStorageBackend[DB_BATCH] extends StorageBackend[DB_

private val SQL_GET_PACKAGE_ENTRIES = SQL(
"""select * from package_entries
|where ({startExclusive} is null or ledger_offset > {startExclusive})
|and ledger_offset <= {endInclusive}
|where ({startExclusive} is null or ledger_offset>{startExclusive})
|and ledger_offset<={endInclusive}
|order by ledger_offset asc
|offset {queryOffset} rows
|fetch next {pageSize} rows only""".stripMargin
Expand Down
Loading

0 comments on commit 640fb68

Please sign in to comment.