Skip to content

Commit

Permalink
DPP-468 StorageBackend tests (#10529)
Browse files Browse the repository at this point in the history
* Add StorageBackend tests

changelog_begin
changelog_end

* Fix Oracle tests

* Do not use empty byte arrays

* Format

* Fix after rebase

* Substitute type params with type bounded ingest method

* Remove empty line

* Assert on configuration contents

* Fix Oracle build

* Add tests for ingestion initialization

* fmt

* Add test for leftover data after reset

* Add resetAll

* Use resetAll between tests

Co-authored-by: Marton Nagy <[email protected]>
  • Loading branch information
rautenrieth-da and nmarton-da authored Aug 23, 2021
1 parent 4e08b47 commit 8501832
Show file tree
Hide file tree
Showing 18 changed files with 1,064 additions and 82 deletions.
4 changes: 4 additions & 0 deletions ledger/participant-integration-api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ da_scala_library(
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//libs-scala/resources-grpc",
"//libs-scala/scala-utils",
"//libs-scala/timer-utils",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:io_grpc_grpc_netty",
Expand Down Expand Up @@ -364,13 +365,16 @@ da_scala_test_suite(
"//ledger/ledger-configuration",
"//ledger/ledger-offset",
"//ledger/ledger-resources",
"//ledger/metrics",
"//ledger/metrics:metrics-test-lib",
"//ledger/participant-state",
"//ledger/participant-state-index",
"//libs-scala/contextualized-logging",
"//libs-scala/logging-entries",
"//libs-scala/oracle-testing",
"//libs-scala/ports",
"//libs-scala/resources",
"//libs-scala/scala-utils",
"@maven//:org_scalatest_scalatest_compatible",
"@maven//:org_slf4j_slf4j_api",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,20 @@ trait StorageBackend[DB_BATCH]
with EventStorageBackend
with DataSourceStorageBackend
with DBLockStorageBackend {

/** Truncates all storage backend tables, EXCEPT the packages table.
* Does not touch other tables, like the Flyway history table.
* Reason: the reset() call is used by the ledger API reset service,
* which is mainly used for application tests in another big project,
* and re-uploading packages after each test significantly slows down their test time.
*/
def reset(connection: Connection): Unit

/** Truncates ALL storage backend tables.
* Does not touch other tables, like the Flyway history table.
* The result is a database that looks the same as a freshly created database with Flyway migrations applied.
*/
def resetAll(connection: Connection): Unit
def duplicateKeyError: String // TODO: Avoid brittleness of error message checks
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,25 @@ private[backend] object H2StorageBackend
()
}

override def resetAll(connection: Connection): Unit = {
SQL("""set referential_integrity false;
|truncate table configuration_entries;
|truncate table packages;
|truncate table package_entries;
|truncate table parameters;
|truncate table participant_command_completions;
|truncate table participant_command_submissions;
|truncate table participant_events_divulgence;
|truncate table participant_events_create;
|truncate table participant_events_consuming_exercise;
|truncate table participant_events_non_consuming_exercise;
|truncate table parties;
|truncate table party_entries;
|set referential_integrity true;""".stripMargin)
.execute()(connection)
()
}

override def duplicateKeyError: String = "Unique index or primary key violation"

val SQL_INSERT_COMMAND: String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,22 @@ private[backend] object OracleStorageBackend
"truncate table party_entries cascade",
).map(SQL(_)).foreach(_.execute()(connection))

override def resetAll(connection: Connection): Unit =
List(
"truncate table configuration_entries cascade",
"truncate table packages cascade",
"truncate table package_entries cascade",
"truncate table parameters cascade",
"truncate table participant_command_completions cascade",
"truncate table participant_command_submissions cascade",
"truncate table participant_events_divulgence cascade",
"truncate table participant_events_create cascade",
"truncate table participant_events_consuming_exercise cascade",
"truncate table participant_events_non_consuming_exercise cascade",
"truncate table parties cascade",
"truncate table party_entries cascade",
).map(SQL(_)).foreach(_.execute()(connection))

override def duplicateKeyError: String = "unique constraint"

val SQL_INSERT_COMMAND: String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,24 @@ private[backend] object PostgresStorageBackend
()
}

override def resetAll(connection: Connection): Unit = {
SQL("""truncate table configuration_entries cascade;
|truncate table packages cascade;
|truncate table package_entries cascade;
|truncate table parameters cascade;
|truncate table participant_command_completions cascade;
|truncate table participant_command_submissions cascade;
|truncate table participant_events_divulgence cascade;
|truncate table participant_events_create cascade;
|truncate table participant_events_consuming_exercise cascade;
|truncate table participant_events_non_consuming_exercise cascade;
|truncate table parties cascade;
|truncate table party_entries cascade;
|""".stripMargin)
.execute()(connection)
()
}

override val duplicateKeyError: String = "duplicate key"

object PostgresQueryStrategy extends QueryStrategy {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.store.backend

import java.sql.Connection

import com.daml.platform.store.backend.h2.H2StorageBackend
import com.daml.platform.store.backend.oracle.OracleStorageBackend
import com.daml.platform.store.backend.postgresql.PostgresStorageBackend
import com.daml.testing.oracle.OracleAroundAll
import com.daml.testing.postgresql.PostgresAroundAll
import org.scalatest.Suite

/** Creates a database and a [[StorageBackend]].
* Used by [[StorageBackendSpec]] to run all StorageBackend tests on different databases.
*/
private[backend] trait StorageBackendProvider {
protected def jdbcUrl: String
protected def backend: StorageBackend[_]
protected final def ingest(dbDtos: Vector[DbDto], connection: Connection): Unit = {
def typeBoundIngest[T](backend: StorageBackend[T]): Unit =
backend.insertBatch(connection, backend.batch(dbDtos))
typeBoundIngest(backend)
}
}

private[backend] trait StorageBackendProviderPostgres
extends StorageBackendProvider
with PostgresAroundAll { this: Suite =>
override protected def jdbcUrl: String = postgresDatabase.url
override protected val backend: StorageBackend[_] = PostgresStorageBackend
}

private[backend] trait StorageBackendProviderH2 extends StorageBackendProvider { this: Suite =>
override protected def jdbcUrl: String = "jdbc:h2:mem:storage_backend_provider;db_close_delay=-1"
override protected val backend: StorageBackend[_] = H2StorageBackend
}

private[backend] trait StorageBackendProviderOracle
extends StorageBackendProvider
with OracleAroundAll { this: Suite =>
override protected def jdbcUrl: String =
s"jdbc:oracle:thin:$oracleUser/$oraclePwd@localhost:$oraclePort/ORCLPDB1"
override protected val backend: StorageBackend[_] = OracleStorageBackend
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.store.backend

import java.sql.Connection
import java.util.concurrent.atomic.AtomicInteger

import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.ledger.resources.{Resource, ResourceContext}
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.configuration.ServerRole
import com.daml.platform.store.appendonlydao.DbDispatcher
import com.daml.platform.store.FlywayMigrations
import org.scalatest.{AsyncTestSuite, BeforeAndAfterEach}

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.{DurationInt, FiniteDuration}

private[backend] trait StorageBackendSpec
extends AkkaBeforeAndAfterAll
with BeforeAndAfterEach
with StorageBackendProvider { this: AsyncTestSuite =>

protected val logger: ContextualizedLogger = ContextualizedLogger.get(getClass)
implicit protected val loggingContext: LoggingContext = LoggingContext.ForTesting

private val connectionPoolSize: Int = 16
private val metrics = new Metrics(new MetricRegistry)

// Initialized in beforeAll()
private var dbDispatcherResource: Resource[DbDispatcher] = _
private var dbDispatcher: DbDispatcher = _

protected def executeSql[T](sql: Connection => T): Future[T] = {
dbDispatcher.executeSql(metrics.test.db)(sql)
}

override protected def beforeAll(): Unit = {
super.beforeAll()

implicit val resourceContext: ResourceContext = ResourceContext(system.dispatcher)
implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
dbDispatcherResource = for {
_ <- Resource.fromFuture(
new FlywayMigrations(jdbcUrl).migrate(enableAppendOnlySchema = true)
)
dispatcher <- DbDispatcher
.owner(
dataSource = backend.createDataSource(jdbcUrl),
serverRole = ServerRole.Testing(this.getClass),
connectionPoolSize = connectionPoolSize,
connectionTimeout = FiniteDuration(250, "millis"),
metrics = metrics,
)
.acquire()
} yield dispatcher

dbDispatcher = Await.result(dbDispatcherResource.asFuture, 30.seconds)
logger.info(
s"Finished setting up database $jdbcUrl for tests. You can now connect to this database to debug failed tests. Note that tables are truncated between each test."
)
}

override protected def afterAll(): Unit = {
Await.result(dbDispatcherResource.release(), 30.seconds)
super.afterAll()
}

private val runningTests = new AtomicInteger(0)

// Each test should start with an empty database to allow testing low-level behavior
// However, creating a fresh database for each test would be too expensive.
// Instead, we truncate all tables using the reset() call before each test.
override protected def beforeEach(): Unit = {
super.beforeEach()

assert(
runningTests.incrementAndGet() == 1,
"StorageBackendSpec tests must not run in parallel, as they all run against the same database.",
)
Await.result(executeSql(backend.resetAll), 10.seconds)
}

override protected def afterEach(): Unit = {
assert(
runningTests.decrementAndGet() == 0,
"StorageBackendSpec tests must not run in parallel, as they all run against the same database.",
)

super.afterEach()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.store.backend

import org.scalatest.flatspec.AsyncFlatSpec

trait StorageBackendSuite
extends StorageBackendSpec
with StorageBackendTestsInitialization
with StorageBackendTestsInitializeIngestion
with StorageBackendTestsIngestion
with StorageBackendTestsReset
with StorageBackendTestsPruning {
this: AsyncFlatSpec =>
}
Loading

0 comments on commit 8501832

Please sign in to comment.