Skip to content

Commit

Permalink
Check compatiblity when creating datasource
Browse files Browse the repository at this point in the history
  • Loading branch information
rautenrieth-da committed Aug 23, 2021
1 parent eb8117b commit b8e4e55
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,6 @@ final class Metrics(val registry: MetricRegistry) {
val execAll: Timer = overall.executionTimer

val getCompletions: DatabaseMetrics = createDbMetrics("get_completions")
val checkCompatibility: DatabaseMetrics = createDbMetrics("check_compatibility")
val getLedgerId: DatabaseMetrics = createDbMetrics("get_ledger_id")
val getParticipantId: DatabaseMetrics = createDbMetrics("get_participant_id")
val getLedgerEnd: DatabaseMetrics = createDbMetrics("get_ledger_end")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,6 @@ private class JdbcLedgerDao(

override def currentHealth(): HealthStatus = dbDispatcher.currentHealth()

/** Returns a failed Future if the Dao is connected to an incompatible storage (e.g., database version too old). */
def checkCompatibility()(implicit loggingContext: LoggingContext): Future[Unit] =
dbDispatcher
.executeSql(metrics.daml.index.db.checkCompatibility)(storageBackend.checkCompatibility)

override def lookupLedgerId()(implicit loggingContext: LoggingContext): Future[Option[LedgerId]] =
dbDispatcher
.executeSql(metrics.daml.index.db.getLedgerId)(
Expand Down Expand Up @@ -907,7 +902,6 @@ private[platform] object JdbcLedgerDao {
participantId,
storageBackend,
)
_ <- ResourceOwner.forFuture(ledgerDao.checkCompatibility)
} yield ledgerDao
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ trait StorageBackend[DB_BATCH]
*/
def resetAll(connection: Connection): Unit
def duplicateKeyError: String // TODO: Avoid brittleness of error message checks
def checkCompatibility(connection: Connection)(implicit loggingContext: LoggingContext): Unit
}

trait IngestionStorageBackend[DB_BATCH] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ private[backend] object H2StorageBackend

override def duplicateKeyError: String = "Unique index or primary key violation"

override def checkCompatibility(connection: Connection)(implicit
loggingContext: LoggingContext
): Unit =
()

val SQL_INSERT_COMMAND: String =
"""merge into participant_command_submissions pcs
|using dual on deduplication_key = {deduplicationKey}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,6 @@ private[backend] object OracleStorageBackend

override def duplicateKeyError: String = "unique constraint"

override def checkCompatibility(connection: Connection)(implicit
loggingContext: LoggingContext
): Unit =
()

val SQL_INSERT_COMMAND: String =
"""merge into participant_command_submissions pcs
|using dual
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import com.daml.platform.store.backend.{
import javax.sql.DataSource
import org.postgresql.ds.PGSimpleDataSource

import scala.util.Using

private[backend] object PostgresStorageBackend
extends StorageBackend[AppendOnlySchema.Batch]
with CommonStorageBackend[AppendOnlySchema.Batch]
Expand Down Expand Up @@ -119,7 +121,7 @@ private[backend] object PostgresStorageBackend
}
}

override def checkCompatibility(
private def checkCompatibility(
connection: Connection
)(implicit loggingContext: LoggingContext): Unit = {
getPostgresVersion(connection) match {
Expand Down Expand Up @@ -216,6 +218,12 @@ private[backend] object PostgresStorageBackend
)(implicit loggingContext: LoggingContext): DataSource = {
val pgSimpleDataSource = new PGSimpleDataSource()
pgSimpleDataSource.setUrl(jdbcUrl)

Using.resource(pgSimpleDataSource.getConnection()) { connection =>
checkCompatibility(connection)
connection.close()
}

val hookFunctions = List(
dataSourceConfig.postgresConfig.synchronousCommit.toList
.map(synchCommitValue => exe(s"SET synchronous_commit TO ${synchCommitValue.pgSqlName}")),
Expand Down

0 comments on commit b8e4e55

Please sign in to comment.