Skip to content

Commit

Permalink
Check Postgres version at startup
Browse files Browse the repository at this point in the history
changelog_begin
changelog_end
  • Loading branch information
rautenrieth-da committed Aug 12, 2021
1 parent 68f04e7 commit 1f98445
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ final class Metrics(val registry: MetricRegistry) {
val execAll: Timer = overall.executionTimer

val getCompletions: DatabaseMetrics = createDbMetrics("get_completions")
val checkCompatibility: DatabaseMetrics = createDbMetrics("check_compatibility")
val getLedgerId: DatabaseMetrics = createDbMetrics("get_ledger_id")
val getParticipantId: DatabaseMetrics = createDbMetrics("get_participant_id")
val getLedgerEnd: DatabaseMetrics = createDbMetrics("get_ledger_end")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ private class JdbcLedgerDao(

override def currentHealth(): HealthStatus = dbDispatcher.currentHealth()

/** Returns a failed Future if the Dao is connected to an incompatible storage (e.g., database version too old). */
def checkCompatibility()(implicit loggingContext: LoggingContext): Future[Unit] =
dbDispatcher
.executeSql(metrics.daml.index.db.checkCompatibility)(storageBackend.checkCompatibility)

override def lookupLedgerId()(implicit loggingContext: LoggingContext): Future[Option[LedgerId]] =
dbDispatcher
.executeSql(metrics.daml.index.db.getLedgerId)(storageBackend.ledgerIdentity)
Expand Down Expand Up @@ -890,26 +895,28 @@ private[platform] object JdbcLedgerDao {
connectionTimeout,
metrics,
)
} yield new JdbcLedgerDao(
dbDispatcher,
servicesExecutionContext,
eventsPageSize,
eventsProcessingParallelism,
validate,
metrics,
lfValueTranslationCache,
validatePartyAllocation,
enricher,
sequentialWriteDao(
participantId,
lfValueTranslationCache,
ledgerDao = new JdbcLedgerDao(
dbDispatcher,
servicesExecutionContext,
eventsPageSize,
eventsProcessingParallelism,
validate,
metrics,
compressionStrategy,
lfValueTranslationCache,
validatePartyAllocation,
enricher,
sequentialWriteDao(
participantId,
lfValueTranslationCache,
metrics,
compressionStrategy,
storageBackend,
),
participantId,
storageBackend,
),
participantId,
storageBackend,
)
)
_ <- ResourceOwner.forFuture(ledgerDao.checkCompatibility)
} yield ledgerDao
}

val acceptType = "accept"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ trait StorageBackend[DB_BATCH]
with DBLockStorageBackend {
def reset(connection: Connection): Unit
def duplicateKeyError: String // TODO: Avoid brittleness of error message checks
def checkCompatibility(connection: Connection)(implicit loggingContext: LoggingContext): Unit
}

trait IngestionStorageBackend[DB_BATCH] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ private[backend] object H2StorageBackend

override def duplicateKeyError: String = "Unique index or primary key violation"

override def checkCompatibility(connection: Connection)(implicit
loggingContext: LoggingContext
): Unit =
()

val SQL_INSERT_COMMAND: String =
"""merge into participant_command_submissions pcs
|using dual on deduplication_key = {deduplicationKey}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ private[backend] object OracleStorageBackend

override def duplicateKeyError: String = "unique constraint"

override def checkCompatibility(connection: Connection)(implicit
loggingContext: LoggingContext
): Unit =
()

val SQL_INSERT_COMMAND: String =
"""merge into participant_command_submissions pcs
|using dual
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import anorm.SqlParser.get
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.logging.LoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.store.appendonlydao.events.{ContractId, Key, Party}
import com.daml.platform.store.backend.EventStorageBackend.FilterParams
import com.daml.platform.store.backend.common.{
Expand All @@ -37,6 +37,8 @@ private[backend] object PostgresStorageBackend
with CommonStorageBackend[AppendOnlySchema.Batch]
with EventStorageBackendTemplate {

private val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)

override def insertBatch(
connection: Connection,
postgresDbBatch: AppendOnlySchema.Batch,
Expand Down Expand Up @@ -86,6 +88,37 @@ private[backend] object PostgresStorageBackend

override val duplicateKeyError: String = "duplicate key"

def getPostgresVersion(connection: Connection): Option[(Int, Int, Int)] = {
val version = SQL"SHOW server_version".as(get[String](1).single)(connection)
val versionPattern = """(\d)[.](\d)[.](\d)""".r
version match {
case versionPattern(major, minor, patch) => Some(major.toInt, minor.toInt, patch.toInt)
case _ => None
}
}

override def checkCompatibility(
connection: Connection
)(implicit loggingContext: LoggingContext): Unit = {
getPostgresVersion(connection) match {
case Some((major, minor, patch)) =>
if (major.toInt < 10) {
logger.error(
"Deprecated Postgres version. " +
s"Found Postgres version $major.$minor.$patch., minimum required Postgres version is 10. " +
"This application will continue running but is at risk of data loss, as Postgres < 10 does not support crash-fault tolerant hash indices. " +
"Please upgrade your Postgres database to version 10 or later to fix this issue. " +
"In the future, this deprecation warning may be upgraded to a fatal error."
)
}
case None =>
logger.warn(
s"Could not determine the version of the Postgres database. Please verify that this application is compatible with this Postgres version."
)
}
()
}

override def commandCompletions(
startExclusive: Offset,
endInclusive: Offset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

package com.daml.platform.store.backend

import com.daml.platform.store.backend.postgresql.PostgresStorageBackend
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.Inside._

final class StorageBackendPostgresSpec
extends AsyncFlatSpec
Expand All @@ -12,4 +14,20 @@ final class StorageBackendPostgresSpec
with StorageBackendTestsInitialization[StorageBackendProviderPostgres.DB_BATCH]
with StorageBackendTestsIngestion[StorageBackendProviderPostgres.DB_BATCH]
with StorageBackendTestsReset[StorageBackendProviderPostgres.DB_BATCH]
with StorageBackendTestsPruning[StorageBackendProviderPostgres.DB_BATCH]
with StorageBackendTestsPruning[StorageBackendProviderPostgres.DB_BATCH] {

behavior of "StorageBackend (Postgres)"

it should "find the Postgres version" in {
for {
version <- executeSql(PostgresStorageBackend.getPostgresVersion)
} yield {
inside(version) { case Some(versionNumbers) =>
// Minimum Postgres version used in tests
versionNumbers._1 should be >= 9
versionNumbers._2 should be >= 0
versionNumbers._3 should be >= 0
}
}
}
}

0 comments on commit 1f98445

Please sign in to comment.