Skip to content

Commit

Permalink
DPP-460 Parameter storage consolidation (#10472)
Browse files Browse the repository at this point in the history
* Refactor ParameterStorageBackend

- a single method for atomic ledger initialization
- a single method to look up the ledger end

changelog_begin
changelog_end

* Add a test

* Fix reading event sequential ids

* Remove debug statements

* Allow ledgerEnd on an empty database

* Initialization is not safe to call concurrently

* Remove leftovers from isolation level change

* Use unit return type

for initialization methods

* Allow getParticipantId on an empty database

* Use exceptions instead of a return type ADT

* Don't use Try for initialization

* Clean up parameters table

* Simplify parameter storage api

* Address review suggestion

* Address review comment

* Address review comment

* Prefer ledger id over participant id mismatch

* Address review comment

* Move type definition

* Remove useleess new keyword

* Renove unused import

* Inline result mapping

* Fix reporting of mismatching participantId
  • Loading branch information
rautenrieth-da authored Aug 19, 2021
1 parent 569612a commit 121047e
Show file tree
Hide file tree
Showing 24 changed files with 571 additions and 362 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ abstract class MismatchException[A](

object MismatchException {

class LedgerId(existing: domain.LedgerId, provided: domain.LedgerId)
extends MismatchException[domain.LedgerId]("ledger id", existing, provided)
case class LedgerId(
override val existing: domain.LedgerId,
override val provided: domain.LedgerId,
) extends MismatchException[domain.LedgerId]("ledger id", existing, provided)

class ParticipantId(existing: domain.ParticipantId, provided: domain.ParticipantId)
extends MismatchException[domain.ParticipantId]("participant id", existing, provided)
case class ParticipantId(
override val existing: domain.ParticipantId,
override val provided: domain.ParticipantId,
) extends MismatchException[domain.ParticipantId]("participant id", existing, provided)

}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ object SqlLedgerReaderWriter {
.flatMap { ledgerId =>
if (providedLedgerId != ledgerId) {
Failure(
new MismatchException.LedgerId(
MismatchException.LedgerId(
domain.LedgerId(ledgerId),
domain.LedgerId(providedLedgerId),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
e560dcc7af3b3b333a3d61668c351c7c1a1a0ac088bf83b59e0b4699a8073951
aae0b43e4735b3ffbdb782a61bad75f741c58a282327f3d3e18b0e41da5c69f6
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ CREATE ALIAS array_intersection FOR "com.daml.platform.store.backend.h2.H2Functi
---------------------------------------------------------------------------------------------------
CREATE TABLE parameters (
ledger_id VARCHAR NOT NULL,
participant_id VARCHAR,
participant_id VARCHAR NOT NULL,
ledger_end VARCHAR,
ledger_end_sequential_id BIGINT,
external_ledger_end VARCHAR,
participant_pruned_up_to_inclusive VARCHAR
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
16adf83956d0f884d1d8886f317b3c0929839a9b0c385b5cb40c6e42cf328ef4
b5e44d2e0e0135b3b41d14ff9cde16300e58bdc7ef025c964a2d24e0da767ff2
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,7 @@ CREATE TABLE parameters
ledger_id NVARCHAR2(1000) not null,
-- stores the head offset, meant to change with every new ledger entry
ledger_end VARCHAR2(4000),
external_ledger_end NVARCHAR2(1000),
participant_id NVARCHAR2(1000),
participant_id NVARCHAR2(1000) not null,
participant_pruned_up_to_inclusive VARCHAR2(4000),
ledger_end_sequential_id NUMBER
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
77f0bfc35c00dc63e12d49256d267da19a36ca723d57a2949eae8331c52ff744
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0


---------------------------------------------------------------------------------------------------
-- V107: Parameter table cleanup
--
-- This migration makes sure the two following invariants hold:
-- - The ledger_id and participant_id columns are always defined at the same time. I.e., either
-- the table is empty, or both are defined.
-- - The ledger_end and ledger_end_sequential_id are always defined at the same time. I.e., either
-- both are NULL, or both are defined.
-- Additionally, it removes unused columns.
---------------------------------------------------------------------------------------------------


-- It is in theory possible that the participant_id column contains a NULL value.
-- This should only happen if the first-time database initialization failed half-way between writing the
-- ledger_id and participant_id. In this case, we would ask the operator to re-create or manually edit the database.
ALTER TABLE parameters ALTER COLUMN participant_id SET NOT NULL;

-- This should only apply when a database was migrated to the append-only schema, where the database contained some
-- state updates (e.g., party allocations), but no events (i.e., no transactions).
UPDATE parameters SET ledger_end_sequential_id = 0 WHERE ledger_end_sequential_id IS NULL AND ledger_end IS NOT NULL;

-- This column is not needed anymore.
ALTER TABLE parameters DROP COLUMN external_ledger_end;
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,13 @@ import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl.{Flow, Keep, Sink}
import com.daml.ledger.api.domain
import com.daml.ledger.api.domain.ParticipantId
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.lf.data.Ref
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.ApiOffset.ApiOffsetConverter
import com.daml.platform.common
import com.daml.platform.common.MismatchException
import com.daml.platform.configuration.ServerRole
import com.daml.platform.indexer.parallel.ParallelIndexerFactory
import com.daml.platform.store.DbType.{
Expand Down Expand Up @@ -215,53 +212,30 @@ object JdbcIndexer {

private def initializeLedger(
dao: LedgerDao
)(implicit ec: ExecutionContext): Future[Option[Offset]] =
for {
initialConditions <- readService.ledgerInitialConditions().runWith(Sink.head)
existingLedgerId <- dao.lookupLedgerId()
providedLedgerId = domain.LedgerId(initialConditions.ledgerId)
_ <- existingLedgerId.fold(initializeLedgerData(providedLedgerId, dao))(
checkLedgerIds(_, providedLedgerId)
)
_ <- initOrCheckParticipantId(dao)
initialLedgerEnd <- dao.lookupInitialLedgerEnd()
} yield initialLedgerEnd

private def checkLedgerIds(
existingLedgerId: domain.LedgerId,
providedLedgerId: domain.LedgerId,
): Future[Unit] =
if (existingLedgerId == providedLedgerId) {
logger.info(s"Found existing ledger with ID: $existingLedgerId")
Future.unit
} else {
Future.failed(new MismatchException.LedgerId(existingLedgerId, providedLedgerId))
)(implicit ec: ExecutionContext): Future[Option[Offset]] = {
// In a high availability setup, multiple indexers might try to initialize the database at the same time
// Add an identifier to correlate the log output for the attempt and the result
val initializationId = java.util.UUID.randomUUID().toString
LoggingContext.withEnrichedLoggingContext(
"initialization_id" -> initializationId
) { implicit loggingContext =>
for {
initialConditions <- readService.ledgerInitialConditions().runWith(Sink.head)
providedLedgerId = domain.LedgerId(initialConditions.ledgerId)
providedParticipantId = domain.ParticipantId(
Ref.ParticipantId.assertFromString(config.participantId)
)
_ = logger.info(
s"Attempting to initialize with ledger ID $providedLedgerId and participant ID $providedParticipantId"
)
_ <- dao.initialize(
ledgerId = providedLedgerId,
participantId = providedParticipantId,
)
initialLedgerEnd <- dao.lookupInitialLedgerEnd()
} yield initialLedgerEnd
}

private def initializeLedgerData(
providedLedgerId: domain.LedgerId,
ledgerDao: LedgerDao,
): Future[Unit] = {
logger.info(s"Initializing ledger with ID: $providedLedgerId")
ledgerDao.initializeLedger(providedLedgerId)
}

private def initOrCheckParticipantId(
dao: LedgerDao
)(implicit ec: ExecutionContext): Future[Unit] = {
val id = ParticipantId(Ref.ParticipantId.assertFromString(config.participantId))
dao
.lookupParticipantId()
.flatMap(
_.fold(dao.initializeParticipantId(id)) {
case `id` =>
Future.successful(logger.info(s"Found existing participant id '$id'"))
case retrievedLedgerId =>
Future.failed(new common.MismatchException.ParticipantId(retrievedLedgerId, id))
}
)
}

}

private val logger = ContextualizedLogger.get(classOf[JdbcIndexer])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.daml.platform.store.appendonlydao.DbDispatcher
import com.daml.platform.store.appendonlydao.events.{CompressionStrategy, LfValueTranslation}
import com.daml.platform.store.backend
import com.daml.platform.store.backend.DataSourceStorageBackend.DataSourceConfig
import com.daml.platform.store.backend.ParameterStorageBackend.LedgerEnd
import com.daml.platform.store.backend.{DbDto, StorageBackend}
import com.google.common.util.concurrent.ThreadFactoryBuilder

Expand Down Expand Up @@ -116,7 +117,8 @@ object ParallelIndexerFactory {
ingester = ingester(storageBackend.insertBatch, dbDispatcher, metrics),
tailer = tailer(storageBackend.batch(Vector.empty)),
tailingRateLimitPerSecond = tailingRateLimitPerSecond,
ingestTail = ingestTail[DB_BATCH](storageBackend.updateParams, dbDispatcher, metrics),
ingestTail =
ingestTail[DB_BATCH](storageBackend.updateLedgerEnd, dbDispatcher, metrics),
)(
InstrumentedSource
.bufferedSource(
Expand Down Expand Up @@ -168,11 +170,13 @@ object ParallelIndexerFactory {
)
.use { dbDispatcher =>
dbDispatcher
.executeSql(metrics.daml.parallelIndexer.initialization)(storageBackend.initialize)
.executeSql(metrics.daml.parallelIndexer.initialization)(
storageBackend.initializeIngestion
)
.flatMap { initialized =>
val (killSwitch, completionFuture) =
ingest(initialized.lastEventSeqId.getOrElse(0L), dbDispatcher)(
readService.stateUpdates(beginAfter = initialized.lastOffset)
ingest(initialized.map(_.lastEventSeqId).getOrElse(0L), dbDispatcher)(
readService.stateUpdates(beginAfter = initialized.map(_.lastOffset))
)
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch])
.toMat(Sink.ignore)(Keep.both)
Expand Down Expand Up @@ -332,14 +336,14 @@ object ParallelIndexerFactory {
offsets = Vector.empty, // not used anymore
)

def ledgerEndFrom(batch: Batch[_]): StorageBackend.Params =
StorageBackend.Params(
ledgerEnd = batch.lastOffset,
eventSeqId = batch.lastSeqEventId,
def ledgerEndFrom(batch: Batch[_]): LedgerEnd =
LedgerEnd(
lastOffset = batch.lastOffset,
lastEventSeqId = batch.lastSeqEventId,
)

def ingestTail[DB_BATCH](
ingestTailFunction: StorageBackend.Params => Connection => Unit,
ingestTailFunction: LedgerEnd => Connection => Unit,
dbDispatcher: DbDispatcher,
metrics: Metrics,
)(implicit loggingContext: LoggingContext): Batch[DB_BATCH] => Future[Batch[DB_BATCH]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import com.daml.platform.store.appendonlydao.events.{
QueryNonPrunedImpl,
TransactionsReader,
}
import com.daml.platform.store.backend.{StorageBackend, UpdateToDbDto}
import com.daml.platform.store.backend.{ParameterStorageBackend, StorageBackend, UpdateToDbDto}
import com.daml.platform.store.dao.ParametersTable.LedgerEndUpdateError
import com.daml.platform.store.dao.events.TransactionsWriter.PreparedInsert
import com.daml.platform.store.dao.{
Expand All @@ -58,7 +58,6 @@ import com.daml.platform.store.entries.{
PackageLedgerEntry,
PartyLedgerEntry,
}
import scalaz.syntax.tag._

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -87,46 +86,59 @@ private class JdbcLedgerDao(
override def currentHealth(): HealthStatus = dbDispatcher.currentHealth()

override def lookupLedgerId()(implicit loggingContext: LoggingContext): Future[Option[LedgerId]] =
dbDispatcher.executeSql(metrics.daml.index.db.getLedgerId)(storageBackend.ledgerId)
dbDispatcher
.executeSql(metrics.daml.index.db.getLedgerId)(
storageBackend.ledgerIdentity(_).map(_.ledgerId)
)

override def lookupParticipantId()(implicit
loggingContext: LoggingContext
): Future[Option[ParticipantId]] =
dbDispatcher.executeSql(metrics.daml.index.db.getParticipantId)(storageBackend.participantId)
dbDispatcher
.executeSql(metrics.daml.index.db.getParticipantId)(
storageBackend.ledgerIdentity(_).map(_.participantId)
)

/** Defaults to Offset.begin if ledger_end is unset
*/
override def lookupLedgerEnd()(implicit loggingContext: LoggingContext): Future[Offset] =
dbDispatcher.executeSql(metrics.daml.index.db.getLedgerEnd)(storageBackend.ledgerEndOffset)
dbDispatcher
.executeSql(metrics.daml.index.db.getLedgerEnd)(
storageBackend.ledgerEndOrBeforeBegin(_).lastOffset
)

case class InvalidLedgerEnd(msg: String) extends RuntimeException(msg)

override def lookupLedgerEndOffsetAndSequentialId()(implicit
loggingContext: LoggingContext
): Future[(Offset, Long)] =
dbDispatcher.executeSql(metrics.daml.index.db.getLedgerEndOffsetAndSequentialId)(
storageBackend.ledgerEndOffsetAndSequentialId
)
dbDispatcher
.executeSql(metrics.daml.index.db.getLedgerEndOffsetAndSequentialId) { connection =>
val end = storageBackend.ledgerEndOrBeforeBegin(connection)
end.lastOffset -> end.lastEventSeqId
}

override def lookupInitialLedgerEnd()(implicit
loggingContext: LoggingContext
): Future[Option[Offset]] =
dbDispatcher.executeSql(metrics.daml.index.db.getInitialLedgerEnd)(
storageBackend.initialLedgerEnd
)

override def initializeLedger(
ledgerId: LedgerId
)(implicit loggingContext: LoggingContext): Future[Unit] =
dbDispatcher.executeSql(metrics.daml.index.db.initializeLedgerParameters) {
implicit connection =>
storageBackend.updateLedgerId(ledgerId.unwrap)(connection)
}
dbDispatcher
.executeSql(metrics.daml.index.db.getInitialLedgerEnd)(
storageBackend.ledgerEnd(_).map(_.lastOffset)
)

override def initializeParticipantId(
participantId: ParticipantId
override def initialize(
ledgerId: LedgerId,
participantId: ParticipantId,
)(implicit loggingContext: LoggingContext): Future[Unit] =
dbDispatcher.executeSql(metrics.daml.index.db.initializeParticipantId) { implicit connection =>
storageBackend.updateParticipantId(participantId.unwrap)(connection)
}
dbDispatcher
.executeSql(metrics.daml.index.db.initializeLedgerParameters)(
storageBackend.initializeParameters(
ParameterStorageBackend.IdentityParams(
ledgerId = ledgerId,
participantId = participantId,
)
)
)

override def lookupLedgerConfiguration()(implicit
loggingContext: LoggingContext
Expand Down Expand Up @@ -710,7 +722,7 @@ private class JdbcLedgerDao(
private[this] def validateOffsetStep(offsetStep: OffsetStep, conn: Connection): Offset = {
offsetStep match {
case IncrementalOffsetStep(p, o) =>
val actualEnd = storageBackend.ledgerEndOffset(conn)
val actualEnd = storageBackend.ledgerEndOrBeforeBegin(conn).lastOffset
if (actualEnd.compareTo(p) != 0) throw LedgerEndUpdateError(p) else o
case CurrentOffset(o) => o
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,7 @@ import java.sql.Connection

import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.platform.store.backend.{
DbDto,
IngestionStorageBackend,
ParameterStorageBackend,
StorageBackend,
}
import com.daml.platform.store.backend.{DbDto, IngestionStorageBackend, ParameterStorageBackend}

import scala.util.chaining.scalaUtilChainingOps

Expand All @@ -30,7 +25,7 @@ case class SequentialWriteDaoImpl[DB_BATCH](

private def lazyInit(connection: Connection): Unit =
if (!lastEventSeqIdInitialized) {
lastEventSeqId = storageBackend.ledgerEnd(connection).lastEventSeqId.getOrElse(0)
lastEventSeqId = storageBackend.ledgerEndOrBeforeBegin(connection).lastEventSeqId
lastEventSeqIdInitialized = true
}

Expand Down Expand Up @@ -60,10 +55,10 @@ case class SequentialWriteDaoImpl[DB_BATCH](
.pipe(storageBackend.batch)
.pipe(storageBackend.insertBatch(connection, _))

storageBackend.updateParams(
StorageBackend.Params(
ledgerEnd = offset,
eventSeqId = lastEventSeqId,
storageBackend.updateLedgerEnd(
ParameterStorageBackend.LedgerEnd(
lastOffset = offset,
lastEventSeqId = lastEventSeqId,
)
)(connection)
}
Expand Down
Loading

0 comments on commit 121047e

Please sign in to comment.