Skip to content

Commit

Permalink
[JSON-API] key_hash field to speed up fetchByKey queries (#10631)
Browse files Browse the repository at this point in the history
* Addition of a key_hash field to speed up fetchByKey queries
CHANGELOG_BEGIN
CHANGELOG_END

* changes to make key_hash and Optional field

CHANGELOG_BEGIN
- Update schema version for http-json-api query store with new key_hash field
- Improved performance for fetchByKey query which now uses key_hash field
CHANGELOG_END

* remove btree index for postgres and other changes based on code review comments
  • Loading branch information
akshayshirahatti-da authored Aug 23, 2021
1 parent 5001329 commit 77eb366
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 53 deletions.
1 change: 1 addition & 0 deletions ledger-service/db-backend/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ da_scala_library(
],
deps = [
"//daml-lf/data",
"//daml-lf/transaction",
"//libs-scala/scala-utils",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import cats.instances.list._
import cats.Applicative
import cats.syntax.applicative._
import cats.syntax.functor._
import com.daml.lf.crypto.Hash
import doobie.free.connection

sealed abstract class Queries(tablePrefix: String) {
Expand All @@ -34,7 +35,7 @@ sealed abstract class Queries(tablePrefix: String) {

type SqlInterpol

val schemaVersion = 1
val schemaVersion = 2

protected[this] def dropIfExists(drop: Droppable): Fragment

Expand Down Expand Up @@ -67,6 +68,7 @@ sealed abstract class Queries(tablePrefix: String) {
(contract_id $contractIdType NOT NULL CONSTRAINT ${tablePrefixFr}contract_k PRIMARY KEY
,tpid $bigIntType NOT NULL REFERENCES $templateIdTableName (tpid)
,${jsonColumn(sql"key")}
,key_hash $keyHashColumn
,${jsonColumn(contractColumnName)}
$contractsTableSignatoriesObservers
,agreement_text $agreementTextType
Expand Down Expand Up @@ -103,6 +105,7 @@ sealed abstract class Queries(tablePrefix: String) {
protected[this] def maxListSize: Option[Int]

protected[this] def jsonColumn(name: Fragment): Fragment
protected[this] def keyHashColumn: Fragment

private[this] val createTemplateIdsTable = CreateTable(
templateIdTableNameRaw,
Expand Down Expand Up @@ -330,12 +333,13 @@ sealed abstract class Queries(tablePrefix: String) {
): Query0[DBContract[Mark0, JsValue, JsValue, Vector[String]]] = {
val q = query(tpid, queriesCondition)
q.query[
(String, Mark0, Key, JsValue, SigsObs, SigsObs, Agreement)
].map { case (cid, tpid, rawKey, payload, signatories, observers, rawAgreement) =>
(String, Mark0, Key, Option[String], JsValue, SigsObs, SigsObs, Agreement)
].map { case (cid, tpid, rawKey, keyHash, payload, signatories, observers, rawAgreement) =>
DBContract(
contractId = cid,
templateId = tpid,
key = key(rawKey),
keyHash = keyHash,
payload = payload,
signatories = sigsObs(signatories),
observers = sigsObs(observers),
Expand Down Expand Up @@ -366,14 +370,14 @@ sealed abstract class Queries(tablePrefix: String) {
private[http] final def fetchByKey(
parties: OneAnd[Set, String],
tpid: SurrogateTpId,
key: JsValue,
key: Hash,
)(implicit
log: LogHandler,
ipol: SqlInterpol,
): ConnectionIO[Option[DBContract[Unit, JsValue, JsValue, Vector[String]]]] =
selectContracts(parties, tpid, keyEquality(key)).option

private[http] def keyEquality(key: JsValue): Fragment
private[http] def keyEquality(key: Hash): Fragment = sql"key_hash = ${key.toHexString.toString}"

private[http] def equalAtContractPath(path: JsonPath, literal: JsValue): Fragment

Expand Down Expand Up @@ -406,6 +410,7 @@ object Queries {
contractId: String,
templateId: TpId,
key: CK,
keyHash: Option[String],
payload: PL,
signatories: Prt,
observers: Prt,
Expand Down Expand Up @@ -614,23 +619,23 @@ private final class PostgresQueries(tablePrefix: String) extends Queries(tablePr
protected[this] override def agreementTextType = sql"TEXT NOT NULL"

protected[this] override def jsonColumn(name: Fragment) = name ++ sql" JSONB NOT NULL"
protected[this] override def keyHashColumn = textType

protected[this] override val maxListSize = None

protected[this] val contractsKeysIndexName =
Fragment.const0(s"${tablePrefix}contract_tpid_key_idx")
protected[this] val contractsTableIndexName = Fragment.const0(s"${tablePrefix}contract_tpid_idx")

private[this] val indexContractsKeys = CreateIndex(sql"""
CREATE INDEX $contractsKeysIndexName ON $contractTableName USING BTREE (tpid, key)
""")

private[this] val indexContractsTable = CreateIndex(sql"""
CREATE INDEX $contractsTableIndexName ON $contractTableName (tpid)
""")

private[this] val contractKeyHashIndexName = Fragment.const0(s"${tablePrefix}ckey_hash_idx")
private[this] val contractKeyHashIndex = CreateIndex(
sql"""CREATE UNIQUE INDEX $contractKeyHashIndexName ON $contractTableName (key_hash)"""
)

protected[this] override def initDatabaseDdls =
super.initDatabaseDdls ++ Seq(indexContractsTable, indexContractsKeys)
super.initDatabaseDdls ++ Seq(indexContractsTable, contractKeyHashIndex)

protected[http] override def version()(implicit log: LogHandler): ConnectionIO[Option[Int]] = {
for {
Expand Down Expand Up @@ -661,7 +666,7 @@ private final class PostgresQueries(tablePrefix: String) extends Queries(tablePr
Update[DBContract[SurrogateTpId, JsValue, JsValue, Array[String]]](
s"""
INSERT INTO $contractTableNameRaw
VALUES (?, ?, ?::jsonb, ?::jsonb, ?, ?, ?)
VALUES (?, ?, ?::jsonb, ?, ?::jsonb, ?, ?, ?)
ON CONFLICT (contract_id) DO NOTHING
"""
).updateMany(dbcs)
Expand All @@ -682,7 +687,7 @@ private final class PostgresQueries(tablePrefix: String) extends Queries(tablePr
trackMatchIndices,
tpidSelector = fr"tpid",
query = (tpid, unionPred) =>
sql"""SELECT contract_id, $tpid tpid, key, payload, signatories, observers, agreement_text
sql"""SELECT contract_id, $tpid tpid, key, key_hash, payload, signatories, observers, agreement_text
FROM $contractTableName AS c
WHERE (signatories && $partyVector::text[] OR observers && $partyVector::text[])
AND ($unionPred)""",
Expand All @@ -700,9 +705,6 @@ private final class PostgresQueries(tablePrefix: String) extends Queries(tablePr
)
)

private[http] override def keyEquality(key: JsValue): Fragment =
sql"key = $key::jsonb"

private[http] override def equalAtContractPath(path: JsonPath, literal: JsValue) =
fragmentContractPath(path) ++ sql" = ${literal}::jsonb"

Expand Down Expand Up @@ -758,6 +760,8 @@ private final class OracleQueries(tablePrefix: String) extends Queries(tablePref
protected[this] override def jsonColumn(name: Fragment) =
sql"$name CLOB NOT NULL CONSTRAINT ${tablePrefixFr}ensure_json_$name CHECK ($name IS JSON)"

protected[this] override def keyHashColumn = sql"NVARCHAR2(64)"

// See http://www.dba-oracle.com/t_ora_01795_maximum_number_of_expressions_in_a_list_is_1000.htm
protected[this] override def maxListSize = Some(1000)

Expand All @@ -782,8 +786,13 @@ private final class OracleQueries(tablePrefix: String) extends Queries(tablePref
sql"""CREATE INDEX $stakeholdersIndexName ON $contractStakeholdersViewName (tpid, stakeholder)"""
)

private[this] val contractKeyHashIndexName = Fragment.const0(s"${tablePrefix}ckey_hash_idx")
private[this] val contractKeyHashIndex = CreateIndex(
sql"""CREATE UNIQUE INDEX $contractKeyHashIndexName ON $contractTableName (key_hash)"""
)

protected[this] override def initDatabaseDdls =
super.initDatabaseDdls ++ Seq(stakeholdersView, stakeholdersIndex)
super.initDatabaseDdls ++ Seq(stakeholdersView, stakeholdersIndex, contractKeyHashIndex)

protected[http] override def version()(implicit log: LogHandler): ConnectionIO[Option[Int]] = {
import cats.implicits._
Expand Down Expand Up @@ -811,11 +820,11 @@ private final class OracleQueries(tablePrefix: String) extends Queries(tablePref
dbcs: F[DBContract[SurrogateTpId, DBContractKey, JsValue, Array[String]]]
)(implicit log: LogHandler, ipol: SqlInterpol): ConnectionIO[Int] = {
import spray.json.DefaultJsonProtocol._
Update[DBContract[SurrogateTpId, JsValue, JsValue, JsValue]](
Update[DBContract[SurrogateTpId, DBContractKey, JsValue, JsValue]](
s"""
INSERT /*+ ignore_row_on_dupkey_index($contractTableNameRaw(contract_id)) */
INTO $contractTableNameRaw (contract_id, tpid, key, payload, signatories, observers, agreement_text)
VALUES (?, ?, ?, ?, ?, ?, ?)
INTO $contractTableNameRaw (contract_id, tpid, key, key_hash, payload, signatories, observers, agreement_text)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
"""
).updateMany(
dbcs.map(_.mapKeyPayloadParties(identity, identity, _.toJson))
Expand All @@ -839,10 +848,10 @@ private final class OracleQueries(tablePrefix: String) extends Queries(tablePref
row_number() over (PARTITION BY c.contract_id ORDER BY c.contract_id) AS rownumber"""
import Queries.CompatImplicits.catsReducibleFromFoldable1
val outerSelectList =
sql"""contract_id, template_id, key, payload,
sql"""contract_id, template_id, key, key_hash, payload,
signatories, observers, agreement_text"""
val dupQ =
sql"""SELECT c.contract_id contract_id, $tpid template_id, key, payload,
sql"""SELECT c.contract_id contract_id, $tpid template_id, key, key_hash, payload,
signatories, observers, agreement_text ${rownum getOrElse fr""}
FROM $contractTableName c
JOIN $contractStakeholdersViewName cst ON (c.contract_id = cst.contract_id)
Expand All @@ -858,11 +867,6 @@ private final class OracleQueries(tablePrefix: String) extends Queries(tablePref
agreement = (_: Option[String]) getOrElse "",
)

private[http] override def keyEquality(key: JsValue): Fragment = {
import spray.json.DefaultJsonProtocol.JsValueFormat
sql"JSON_EQUAL(key, ${toDBContractKey(key)})"
}

private[this] def pathSteps(path: JsonPath): Cord =
path.elems.foldMap(_.fold(k => (".\"": Cord) ++ k :- '"', (_: _0.type) => "[0]"))

Expand Down
12 changes: 6 additions & 6 deletions ledger-service/http-json-perf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Gatling scenarios extend from `io.gatling.core.scenario.Simulation`:
- `com.daml.http.perf.scenario.SyncQueryConstantAcs`
- `com.daml.http.perf.scenario.SyncQueryNewAcs`
- `com.daml.http.perf.scenario.SyncQueryVariableAcs`
- `com.daml.http.perf.scenario.MultiUserQueryScenario`
- `com.daml.http.perf.scenario.OracleMultiUserQueryScenario`

# 2. Running Gatling Scenarios from Bazel

Expand All @@ -34,9 +34,9 @@ $ bazel run //ledger-service/http-json-perf:http-json-perf-binary -- \
--jwt="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJodHRwczovL2RhbWwuY29tL2xlZGdlci1hcGkiOnsibGVkZ2VySWQiOiJNeUxlZGdlciIsImFwcGxpY2F0aW9uSWQiOiJmb29iYXIiLCJhY3RBcyI6WyJBbGljZSJdfX0.VdDI96mw5hrfM5ZNxLyetSVwcD7XtLT4dIdHIOa9lcU"
```

## 2.3 Running MultiUserQueryScenario
## 2.3 Running OracleMultiUserQueryScenario

Preferably retain the data between runs to specifically focus on testing query performance.
We use an external docker oracle vm, so we want to retain the data between runs to specifically focus on testing query performance.
use `RETAIN_DATA` and `USE_DEFAULT_USER` env vars to use a static user(`ORACLE_USER`) and preserve data.
This scenario uses a single template `KeyedIou` defined in `LargeAcs.daml`.

Expand All @@ -48,7 +48,7 @@ We can control a few scenario parameters i.e `NUM_RECORDS` `NUM_QUERIES` `NUM_RE

```
USE_DEFAULT_USER=true RETAIN_DATA=true RUN_MODE="populateCache" bazel run //ledger-service/http-json-perf:http-json-perf-binary-ee -- --scenario=com.daml.http.perf.scenario.MultiUserQueryScenario --jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJodHRwczovL2RhbWwuY29tL2xlZGdlci1hcGkiOnsibGVkZ2VySWQiOiJNeUxlZGdlciIsImFwcGxpY2F0aW9uSWQiOiJmb29iYXIiLCJhY3RBcyI6WyJBbGljZSJdfX0.VdDI96mw5hrfM5ZNxLyetSVwcD7XtLT4dIdHIOa9lcU --dars=$PWD/bazel-bin/ledger-service/http-json-perf/LargeAcs.dar --query-store-index oracle
USE_DEFAULT_USER=true RETAIN_DATA=true RUN_MODE="populateCache" bazel run //ledger-service/http-json-perf:http-json-perf-binary-ee -- --scenario=com.daml.http.perf.scenario.OracleMultiUserQueryScenario --jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJodHRwczovL2RhbWwuY29tL2xlZGdlci1hcGkiOnsibGVkZ2VySWQiOiJNeUxlZGdlciIsImFwcGxpY2F0aW9uSWQiOiJmb29iYXIiLCJhY3RBcyI6WyJBbGljZSJdfX0.VdDI96mw5hrfM5ZNxLyetSVwcD7XtLT4dIdHIOa9lcU --dars=$PWD/bazel-bin/ledger-service/http-json-perf/LargeAcs.dar --query-store-index oracle
```

Expand All @@ -58,7 +58,7 @@ Query contracts by the defined key field.

```
USE_DEFAULT_USER=true RETAIN_DATA=true RUN_MODE="fetchByKey" NUM_QUERIES=100 bazel run //ledger-service/http-json-perf:http-json-perf-binary-ee -- --scenario=com.daml.http.perf.scenario.MultiUserQueryScenario --jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJodHRwczovL2RhbWwuY29tL2xlZGdlci1hcGkiOnsibGVkZ2VySWQiOiJNeUxlZGdlciIsImFwcGxpY2F0aW9uSWQiOiJmb29iYXIiLCJhY3RBcyI6WyJBbGljZSJdfX0.VdDI96mw5hrfM5ZNxLyetSVwcD7XtLT4dIdHIOa9lcU --dars=$PWD/bazel-bin/ledger-service/http-json-perf/LargeAcs.dar --query-store-index oracle
USE_DEFAULT_USER=true RETAIN_DATA=true RUN_MODE="fetchByKey" NUM_QUERIES=100 bazel run //ledger-service/http-json-perf:http-json-perf-binary-ee -- --scenario=com.daml.http.perf.scenario.OracleMultiUserQueryScenario --jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJodHRwczovL2RhbWwuY29tL2xlZGdlci1hcGkiOnsibGVkZ2VySWQiOiJNeUxlZGdlciIsImFwcGxpY2F0aW9uSWQiOiJmb29iYXIiLCJhY3RBcyI6WyJBbGljZSJdfX0.VdDI96mw5hrfM5ZNxLyetSVwcD7XtLT4dIdHIOa9lcU --dars=$PWD/bazel-bin/ledger-service/http-json-perf/LargeAcs.dar --query-store-index oracle
```

Expand All @@ -68,7 +68,7 @@ Query contracts by a field on the payload which is the `id` in this case.

```
USE_DEFAULT_USER=true RETAIN_DATA=true RUN_MODE="fetchByQuery" bazel run //ledger-service/http-json-perf:http-json-perf-binary-ee -- --scenario=com.daml.http.perf.scenario.MultiUserQueryScenario --jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJodHRwczovL2RhbWwuY29tL2xlZGdlci1hcGkiOnsibGVkZ2VySWQiOiJNeUxlZGdlciIsImFwcGxpY2F0aW9uSWQiOiJmb29iYXIiLCJhY3RBcyI6WyJBbGljZSJdfX0.VdDI96mw5hrfM5ZNxLyetSVwcD7XtLT4dIdHIOa9lcU --dars=$PWD/bazel-bin/ledger-service/http-json-perf/LargeAcs.dar --query-store-index oracle
USE_DEFAULT_USER=true RETAIN_DATA=true RUN_MODE="fetchByQuery" bazel run //ledger-service/http-json-perf:http-json-perf-binary-ee -- --scenario=com.daml.http.perf.scenario.OracleMultiUserQueryScenario --jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJodHRwczovL2RhbWwuY29tL2xlZGdlci1hcGkiOnsibGVkZ2VySWQiOiJNeUxlZGdlciIsImFwcGxpY2F0aW9uSWQiOiJmb29iYXIiLCJhY3RBcyI6WyJBbGljZSJdfX0.VdDI96mw5hrfM5ZNxLyetSVwcD7XtLT4dIdHIOa9lcU --dars=$PWD/bazel-bin/ledger-service/http-json-perf/LargeAcs.dar --query-store-index oracle
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ object Main extends StrictLogging {
user.name,
user.pwd,
dbStartupMode = startupMode,
connectionTimeout = 15000, // increase value for performance tests
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.daml.http.perf.scenario

import com.daml.http.perf.scenario.MultiUserQueryScenario._
import com.daml.http.perf.scenario.OracleMultiUserQueryScenario._
import io.gatling.core.Predef._
import io.gatling.core.structure.PopulationBuilder
import io.gatling.http.Predef._
Expand All @@ -19,15 +19,15 @@ private[scenario] trait HasRandomCurrency {
}
}

object MultiUserQueryScenario {
object OracleMultiUserQueryScenario {
sealed trait RunMode { def name: String }
case object PopulateCache extends RunMode { val name = "populateCache" }
case object FetchByKey extends RunMode { val name = "fetchByKey" }
case object FetchByQuery extends RunMode { val name = "fetchByQuery" }
}

@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
class MultiUserQueryScenario
class OracleMultiUserQueryScenario
extends Simulation
with SimulationConfig
with HasRandomAmount
Expand Down Expand Up @@ -142,11 +142,10 @@ class MultiUserQueryScenario
def getPopulationBuilder(runMode: RunMode): PopulationBuilder = {
runMode match {
case PopulateCache =>
val currIter = currencies.iterator
writeScn
.inject(atOnceUsers(numWriters))
.andThen(
currQueryScn(numIterations = currencies.size, () => currIter.next())
currQueryScn(numIterations = 1, () => currencies.head)
.inject(
nothingFor(2.seconds),
atOnceUsers(1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ abstract class ContractDaoBenchmark extends OracleAround {
contractId = s"#$id",
templateId = tpid,
key = JsNull,
keyHash = None,
payload = payload,
signatories = Seq(signatory),
observers = Seq.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ private class ContractsFetch(
): Exception \/ PreInsertContract = {
import scalaz.syntax.traverse._
import scalaz.std.option._
import com.daml.lf.crypto.Hash
for {
ac <- domain.ActiveContract fromLedgerApi ce leftMap (de =>
new IllegalArgumentException(s"contract ${ce.contractId}: ${de.shows}"): Exception,
Expand All @@ -188,6 +189,11 @@ private class ContractsFetch(
contractId = ac.contractId.unwrap,
templateId = ac.templateId,
key = lfKey.cata(lfValueToDbJsValue, JsNull),
keyHash = lfKey.map(
Hash
.assertHashContractKey(TemplateId.toLedgerApiValue(ac.templateId), _)
.toHexString
),
payload = lfValueToDbJsValue(lfArg),
signatories = ac.signatories,
observers = ac.observers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ import akka.stream.Materializer
import com.daml.lf
import com.daml.http.LedgerClientJwt.Terminates
import com.daml.http.dbbackend.ContractDao
import com.daml.http.domain.TemplateId.toLedgerApiValue
import com.daml.http.domain.{GetActiveContractsRequest, JwtPayload, TemplateId}
import com.daml.http.json.JsonProtocol.LfValueCodec
import com.daml.http.json.JsonProtocol.LfValueDatabaseCodec.{
apiValueToJsValue => toDbCompatibleJson
}
import com.daml.http.query.ValuePredicate
import util.{AbsoluteBookmark, ApiValueToLfValueConverter, ContractStreamStep, InsertDeleteStep}
import com.daml.http.util.ContractStreamStep.{Acs, LiveBegin}
Expand Down Expand Up @@ -292,13 +290,17 @@ class ContractsService(
)(implicit
lc: LoggingContextOf[InstanceUUID]
): Future[Option[domain.ActiveContract[LfV]]] = {
import ctx.{jwt, parties, templateIds => templateId}
import ctx.{jwt, parties, templateIds => templateId}, com.daml.lf.crypto.Hash
for {
resolved <- toFuture(resolveTemplateId(templateId))
found <- unsafeRunAsync {
import doobie.implicits._, cats.syntax.apply._
fetch.fetchAndPersist(jwt, parties, List(resolved)) *>
ContractDao.fetchByKey(parties, resolved, toDbCompatibleJson(contractKey))
ContractDao.fetchByKey(
parties,
resolved,
Hash.assertHashContractKey(toLedgerApiValue(resolved), contractKey),
)
}
} yield found
}
Expand Down
Loading

0 comments on commit 77eb366

Please sign in to comment.