From ef239fdd1a8e5ddcaa5154ce4826422a76820dca Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Wed, 25 Aug 2021 08:30:39 +0200 Subject: [PATCH] participant-integration-api: Move `TrackerMap` code around. [KVL-1009] (#10653) * participant-integration-api: Make `TrackerMap` a `Tracker`. Mostly by moving parameters around. CHANGELOG_BEGIN CHANGELOG_END * participant-integration-api: Clean up `TrackerMap` a little more. * participant-integration-api: Make `TrackerMap` generic. * participant-integration-api: Move `Tracker.WithLastSubmission`. It's only used by `TrackerMap`, so now it's an internal class there. * participant-integration-api: Only provide the key to `newTracker`. * participant-integration-api: Subsume the `TrackerMap` cleanup schedule. * participant-integration-api: Construct the tracker map outside. * ledger-api-common: Remove some unnecessary braces. * participant-integration-api: Prettify `TrackerMap` some more. * participant-integration-api: Make `TrackerMap.selfCleaning` a class. * participant-integration-api: Add some tests for TrackerMap. * participant-integration-api: Convert a method to `Runnable`, 2.12-style. Apparently underscores aren't good enough. * ledger-api-client: Delete CompletionSuccess#unapply. It doesn't work on Scala 2.12. --- .../commands/tracker/CompletionResponse.scala | 9 - .../api/validation/CommandsValidator.scala | 16 +- .../participant-integration-api/BUILD.bazel | 4 + .../services/ApiCommandService.scala | 147 ++++++++------- .../apiserver/services/tracking/Tracker.scala | 28 +-- .../services/tracking/TrackerImpl.scala | 2 +- .../services/tracking/TrackerMap.scala | 168 +++++++++++------- .../services/tracking/TrackerMapSpec.scala | 161 +++++++++++++++++ 8 files changed, 358 insertions(+), 177 deletions(-) create mode 100644 ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/TrackerMapSpec.scala diff --git a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CompletionResponse.scala b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CompletionResponse.scala index 8bf3901507f2..f422840f8fd0 100644 --- a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CompletionResponse.scala +++ b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CompletionResponse.scala @@ -49,15 +49,6 @@ object CompletionResponse { originalStatus: StatusProto, ) - object CompletionSuccess { - - /** In most cases we're not interested in the original grpc status, as it's used only to keep backwards compatibility - */ - def unapply(success: CompletionSuccess): Option[(String, String)] = Some( - success.commandId -> success.transactionId - ) - } - def apply(completion: Completion): Either[CompletionFailure, CompletionSuccess] = completion.status match { case Some(grpcStatus) if Code.OK.value() == grpcStatus.code => diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/CommandsValidator.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/CommandsValidator.scala index fff37c0a1f4f..9e15baf1d66d 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/CommandsValidator.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/ledger/api/validation/CommandsValidator.scala @@ -203,15 +203,17 @@ object CommandsValidator { } def effectiveSubmitters(commands: ProtoCommands): Submitters[String] = { - val effectiveActAs = - if (commands.party.isEmpty) - commands.actAs.toSet - else - commands.actAs.toSet + commands.party - val effectiveReadAs = commands.readAs.toSet -- effectiveActAs - Submitters(effectiveActAs, effectiveReadAs) + val actAs = effectiveActAs(commands) + val readAs = commands.readAs.toSet -- actAs + Submitters(actAs, readAs) } + def effectiveActAs(commands: ProtoCommands): Set[String] = + if (commands.party.isEmpty) + commands.actAs.toSet + else + commands.actAs.toSet + commands.party + val noSubmitters: Submitters[String] = Submitters(Set.empty, Set.empty) def validateSubmitters( diff --git a/ledger/participant-integration-api/BUILD.bazel b/ledger/participant-integration-api/BUILD.bazel index 9fcf069f0ecf..0161909b31f4 100644 --- a/ledger/participant-integration-api/BUILD.bazel +++ b/ledger/participant-integration-api/BUILD.bazel @@ -232,6 +232,9 @@ da_scala_test_suite( jvm_flags = [ "-Djava.security.debug=\"certpath ocsp\"", # This facilitates debugging of the OCSP checks mechanism ], + plugins = [ + silencer_plugin, + ], resources = glob(["src/test/resources/**/*"]), scala_deps = [ "@maven//:com_typesafe_akka_akka_actor", @@ -241,6 +244,7 @@ da_scala_test_suite( "@maven//:org_mockito_mockito_scala", "@maven//:org_playframework_anorm_anorm", "@maven//:org_playframework_anorm_anorm_tokenizer", + "@maven//:org_scala_lang_modules_scala_collection_compat", "@maven//:org_scalacheck_scalacheck", "@maven//:org_scalactic_scalactic", "@maven//:org_scalatest_scalatest_core", diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala index db2b6b66e4fe..4fd869a3f17c 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala @@ -6,7 +6,6 @@ package com.daml.platform.apiserver.services import java.time.Instant import akka.NotUsed -import akka.actor.Cancellable import akka.stream.Materializer import akka.stream.scaladsl.{Flow, Keep, Source} import com.daml.api.util.TimeProvider @@ -19,6 +18,7 @@ import com.daml.ledger.api.v1.command_completion_service.{ } import com.daml.ledger.api.v1.command_service._ import com.daml.ledger.api.v1.command_submission_service.SubmitRequest +import com.daml.ledger.api.v1.commands.Commands import com.daml.ledger.api.v1.completion.Completion import com.daml.ledger.api.v1.transaction_service.{ GetFlatTransactionResponse, @@ -39,26 +39,23 @@ import com.daml.metrics.Metrics import com.daml.platform.api.grpc.GrpcApiService import com.daml.platform.apiserver.configuration.LedgerConfigurationSubscription import com.daml.platform.apiserver.services.ApiCommandService._ -import com.daml.platform.apiserver.services.tracking.{TrackerImpl, TrackerMap} +import com.daml.platform.apiserver.services.tracking.{Tracker, TrackerImpl, TrackerMap} import com.daml.platform.server.api.ApiException import com.daml.platform.server.api.services.grpc.GrpcCommandService import com.daml.util.Ctx import com.daml.util.akkastreams.MaxInFlight import com.google.protobuf.empty.Empty -import io.grpc._ +import io.grpc.Status import scalaz.syntax.tag._ -import scala.concurrent.duration._ +import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.Try private[apiserver] final class ApiCommandService private ( services: LocalServices, - configuration: ApiCommandService.Configuration, - ledgerConfigurationSubscription: LedgerConfigurationSubscription, - metrics: Metrics, + submissionTracker: Tracker, )(implicit - materializer: Materializer, executionContext: ExecutionContext, loggingContext: LoggingContext, ) extends CommandServiceGrpc.CommandService @@ -66,17 +63,10 @@ private[apiserver] final class ApiCommandService private ( private val logger = ContextualizedLogger.get(this.getClass) - private val submissionTracker: TrackerMap = TrackerMap(configuration.retentionPeriod) - private val staleCheckerInterval: FiniteDuration = 30.seconds - - private val trackerCleanupJob: Cancellable = materializer.system.scheduler - .scheduleAtFixedRate(staleCheckerInterval, staleCheckerInterval)(submissionTracker.cleanup) - @volatile private var running = true override def close(): Unit = { logger.info("Shutting down Command Service") - trackerCleanupJob.cancel() running = false submissionTracker.close() } @@ -91,7 +81,7 @@ private[apiserver] final class ApiCommandService private ( logging.readAsStrings(request.getCommands.readAs), ) { implicit loggingContext => if (running) { - track(request) + submissionTracker.track(request) } else { Future.failed( new ApiException(Status.UNAVAILABLE.withDescription("Service has been shut down.")) @@ -99,55 +89,6 @@ private[apiserver] final class ApiCommandService private ( }.andThen(logger.logErrorsOnCall[Completion]) } - private def track( - request: SubmitAndWaitRequest - )(implicit - loggingContext: LoggingContext - ): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = { - val appId = request.getCommands.applicationId - // Note: command completions are returned as long as at least one of the original submitters - // is specified in the command completion request. - val parties = CommandsValidator.effectiveSubmitters(request.getCommands).actAs - val submitter = TrackerMap.Key(application = appId, parties = parties) - // Use just name of first party for open-ended metrics to avoid unbounded metrics name for multiple parties - val metricsPrefixFirstParty = parties.toList.min - submissionTracker.track(submitter, request) { - for { - ledgerEnd <- services.getCompletionEnd().map(_.getOffset) - } yield { - val tracker = - CommandTrackerFlow[Promise[Either[CompletionFailure, CompletionSuccess]], NotUsed]( - services.submissionFlow, - offset => - services - .getCompletionSource( - CompletionStreamRequest( - configuration.ledgerId.unwrap, - appId, - parties.toList, - Some(offset), - ) - ) - .mapConcat(CommandCompletionSource.toStreamElements), - ledgerEnd, - () => ledgerConfigurationSubscription.latestConfiguration().map(_.maxDeduplicationTime), - ) - val trackingFlow = MaxInFlight( - configuration.maxCommandsInFlight, - capacityCounter = metrics.daml.commands.maxInFlightCapacity(metricsPrefixFirstParty), - lengthCounter = metrics.daml.commands.maxInFlightLength(metricsPrefixFirstParty), - ).joinMat(tracker)(Keep.right) - TrackerImpl( - trackingFlow, - configuration.inputBufferSize, - capacityCounter = metrics.daml.commands.inputBufferCapacity(metricsPrefixFirstParty), - lengthCounter = metrics.daml.commands.inputBufferLength(metricsPrefixFirstParty), - delayTimer = metrics.daml.commands.inputBufferDelay(metricsPrefixFirstParty), - ) - } - } - } - override def submitAndWait(request: SubmitAndWaitRequest): Future[Empty] = submitAndWaitInternal(request).map( _.fold( @@ -205,12 +146,12 @@ private[apiserver] final class ApiCommandService private ( }, ) } - - override def toString: String = ApiCommandService.getClass.getSimpleName } private[apiserver] object ApiCommandService { + private val trackerCleanupInterval = 30.seconds + def create( configuration: Configuration, services: LocalServices, @@ -221,9 +162,15 @@ private[apiserver] object ApiCommandService { materializer: Materializer, executionContext: ExecutionContext, loggingContext: LoggingContext, - ): CommandServiceGrpc.CommandService with GrpcApiService = + ): CommandServiceGrpc.CommandService with GrpcApiService = { + val submissionTracker = new TrackerMap.SelfCleaning( + configuration.retentionPeriod, + Tracking.getTrackerKey, + Tracking.newTracker(configuration, services, ledgerConfigurationSubscription, metrics), + trackerCleanupInterval, + ) new GrpcCommandService( - new ApiCommandService(services, configuration, ledgerConfigurationSubscription, metrics), + service = new ApiCommandService(services, submissionTracker), ledgerId = configuration.ledgerId, currentLedgerTime = () => timeProvider.getCurrentTime, currentUtcTime = () => Instant.now, @@ -231,6 +178,7 @@ private[apiserver] object ApiCommandService { ledgerConfigurationSubscription.latestConfiguration().map(_.maxDeduplicationTime), generateSubmissionId = SubmissionIdGenerator.Random, ) + } final case class Configuration( ledgerId: LedgerId, @@ -251,4 +199,65 @@ private[apiserver] object ApiCommandService { getFlatTransactionById: GetTransactionByIdRequest => Future[GetFlatTransactionResponse], ) + private object Tracking { + final case class Key(applicationId: String, parties: Set[String]) + + def getTrackerKey(commands: Commands): Tracking.Key = { + val parties = CommandsValidator.effectiveActAs(commands) + Tracking.Key(commands.applicationId, parties) + } + + def newTracker( + configuration: Configuration, + services: LocalServices, + ledgerConfigurationSubscription: LedgerConfigurationSubscription, + metrics: Metrics, + )( + key: Tracking.Key + )(implicit + materializer: Materializer, + executionContext: ExecutionContext, + loggingContext: LoggingContext, + ): Future[Tracker] = { + // Note: command completions are returned as long as at least one of the original submitters + // is specified in the command completion request. + // Use just name of first party for open-ended metrics to avoid unbounded metrics name for multiple parties + val metricsPrefixFirstParty = key.parties.min + for { + ledgerEnd <- services.getCompletionEnd().map(_.getOffset) + } yield { + val commandTrackerFlow = + CommandTrackerFlow[Promise[Either[CompletionFailure, CompletionSuccess]], NotUsed]( + services.submissionFlow, + offset => + services + .getCompletionSource( + CompletionStreamRequest( + configuration.ledgerId.unwrap, + key.applicationId, + key.parties.toList, + Some(offset), + ) + ) + .mapConcat(CommandCompletionSource.toStreamElements), + ledgerEnd, + () => ledgerConfigurationSubscription.latestConfiguration().map(_.maxDeduplicationTime), + ) + val trackingFlow = MaxInFlight( + configuration.maxCommandsInFlight, + capacityCounter = metrics.daml.commands.maxInFlightCapacity(metricsPrefixFirstParty), + lengthCounter = metrics.daml.commands.maxInFlightLength(metricsPrefixFirstParty), + ).joinMat(commandTrackerFlow)(Keep.right) + + TrackerImpl( + trackingFlow, + configuration.inputBufferSize, + capacityCounter = metrics.daml.commands.inputBufferCapacity(metricsPrefixFirstParty), + lengthCounter = metrics.daml.commands.inputBufferLength(metricsPrefixFirstParty), + delayTimer = metrics.daml.commands.inputBufferDelay(metricsPrefixFirstParty), + ) + } + } + } + } diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/Tracker.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/Tracker.scala index bea8c1247f0d..7f42de356295 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/Tracker.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/Tracker.scala @@ -12,35 +12,11 @@ import com.daml.logging.LoggingContext import scala.concurrent.{ExecutionContext, Future} -private[tracking] trait Tracker extends AutoCloseable { +trait Tracker extends AutoCloseable { def track(request: SubmitAndWaitRequest)(implicit - ec: ExecutionContext, + executionContext: ExecutionContext, loggingContext: LoggingContext, ): Future[Either[TrackedCompletionFailure, CompletionSuccess]] } - -private[tracking] object Tracker { - - class WithLastSubmission(delegate: Tracker) extends Tracker { - - override def close(): Unit = delegate.close() - - @volatile private var lastSubmission = System.nanoTime() - - def getLastSubmission: Long = lastSubmission - - override def track(request: SubmitAndWaitRequest)(implicit - ec: ExecutionContext, - loggingContext: LoggingContext, - ): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = { - lastSubmission = System.nanoTime() - delegate.track(request) - } - } - - object WithLastSubmission { - def apply(delegate: Tracker): WithLastSubmission = new WithLastSubmission(delegate) - } -} diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/TrackerImpl.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/TrackerImpl.scala index bd0662a21c5c..fab89a3fd102 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/TrackerImpl.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/TrackerImpl.scala @@ -35,7 +35,7 @@ private[services] final class TrackerImpl( import TrackerImpl.logger override def track(request: SubmitAndWaitRequest)(implicit - ec: ExecutionContext, + executionContext: ExecutionContext, loggingContext: LoggingContext, ): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = { logger.trace("Tracking command") diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/TrackerMap.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/TrackerMap.scala index 7e6fd4fe5c26..7bb07dd0922d 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/TrackerMap.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/TrackerMap.scala @@ -5,100 +5,125 @@ package com.daml.platform.apiserver.services.tracking import java.util.concurrent.atomic.AtomicReference +import akka.stream.Materializer import com.daml.dec.DirectExecutionContext import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest +import com.daml.ledger.api.v1.commands.Commands import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{ CompletionSuccess, TrackedCompletionFailure, } import com.daml.logging.{ContextualizedLogger, LoggingContext} -import org.slf4j.LoggerFactory +import com.daml.platform.apiserver.services.tracking.TrackerMap._ import scala.collection.immutable.HashMap -import scala.concurrent.duration.{FiniteDuration, _} +import scala.concurrent.duration.{DurationLong, FiniteDuration} import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} /** A map for [[Tracker]]s with thread-safe tracking methods and automatic cleanup. A tracker tracker, if you will. * @param retentionPeriod The minimum finite duration for which to retain idle trackers. */ -private[services] final class TrackerMap(retentionPeriod: FiniteDuration)(implicit - loggingContext: LoggingContext -) extends AutoCloseable { +private[services] final class TrackerMap[Key]( + retentionPeriod: FiniteDuration, + getKey: Commands => Key, + newTracker: Key => Future[Tracker], +)(implicit loggingContext: LoggingContext) + extends Tracker + with AutoCloseable { private val logger = ContextualizedLogger.get(this.getClass) private val lock = new Object() @volatile private var trackerBySubmitter = - HashMap.empty[TrackerMap.Key, TrackerMap.AsyncResource[Tracker.WithLastSubmission]] - - val cleanup: Runnable = { - require( - retentionPeriod < Long.MaxValue.nanoseconds, - s"Retention period$retentionPeriod is too long. Must be below ${Long.MaxValue} nanoseconds.", - ) - - val retentionNanos = retentionPeriod.toNanos - - { () => - lock.synchronized { - val nanoTime = System.nanoTime() - trackerBySubmitter foreach { case (submitter, trackerResource) => - trackerResource.ifPresent(tracker => - if (nanoTime - tracker.getLastSubmission > retentionNanos) { - logger.info( - s"Shutting down tracker for $submitter after inactivity of $retentionPeriod" - ) - remove(submitter) - tracker.close() - } - ) - } - } - } - } - - def track(submitter: TrackerMap.Key, request: SubmitAndWaitRequest)( - newTracker: => Future[Tracker] - )(implicit ec: ExecutionContext): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = + HashMap.empty[Key, TrackerMap.AsyncResource[TrackerWithLastSubmission]] + + require( + retentionPeriod < Long.MaxValue.nanoseconds, + s"Retention period$retentionPeriod is too long. Must be below ${Long.MaxValue} nanoseconds.", + ) + + private val retentionNanos = retentionPeriod.toNanos + + override def track( + request: SubmitAndWaitRequest + )(implicit + executionContext: ExecutionContext, + loggingContext: LoggingContext, + ): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = { + val commands = request.getCommands + val key = getKey(commands) // double-checked locking trackerBySubmitter .getOrElse( - submitter, + key, lock.synchronized { trackerBySubmitter.getOrElse( - submitter, { - val r = new TrackerMap.AsyncResource(newTracker.map { t => - logger.info(s"Registered tracker for submitter $submitter") - Tracker.WithLastSubmission(t) + key, { + val r = new TrackerMap.AsyncResource(newTracker(key).map { t => + logger.info(s"Registered tracker for submitter $key") + new TrackerWithLastSubmission(t) }) - - trackerBySubmitter += submitter -> r - + trackerBySubmitter += key -> r r }, ) }, ) .flatMap(_.track(request)) - - private def remove(submitter: TrackerMap.Key): Unit = lock.synchronized { - trackerBySubmitter -= submitter } - def close(): Unit = { - lock.synchronized { - logger.info(s"Shutting down ${trackerBySubmitter.size} trackers") - trackerBySubmitter.values.foreach(_.close()) - trackerBySubmitter = HashMap.empty + def cleanup(): Unit = lock.synchronized { + val nanoTime = System.nanoTime() + trackerBySubmitter foreach { case (submitter, trackerResource) => + trackerResource.ifPresent { tracker => + if (nanoTime - tracker.getLastSubmission > retentionNanos) { + logger.info( + s"Shutting down tracker for $submitter after inactivity of $retentionPeriod" + )(trackerResource.loggingContext) + trackerBySubmitter -= submitter + tracker.close() + } + } } } + + def close(): Unit = lock.synchronized { + logger.info(s"Shutting down ${trackerBySubmitter.size} trackers") + trackerBySubmitter.values.foreach(_.close()) + trackerBySubmitter = HashMap.empty + } } private[services] object TrackerMap { - - final case class Key(application: String, parties: Set[String]) + final class SelfCleaning[Key]( + retentionPeriod: FiniteDuration, + getKey: Commands => Key, + newTracker: Key => Future[Tracker], + cleanupInterval: FiniteDuration, + )(implicit + materializer: Materializer, + executionContext: ExecutionContext, + loggingContext: LoggingContext, + ) extends Tracker { + private val delegate = new TrackerMap(retentionPeriod, getKey, newTracker) + private val trackerCleanupJob = materializer.system.scheduler + .scheduleAtFixedRate(cleanupInterval, cleanupInterval)(() => delegate.cleanup()) + + override def track( + request: SubmitAndWaitRequest + )(implicit + executionContext: ExecutionContext, + loggingContext: LoggingContext, + ): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = + delegate.track(request) + + override def close(): Unit = { + trackerCleanupJob.cancel() + delegate.close() + } + } sealed trait AsyncResourceState[+T <: AutoCloseable] final case object Waiting extends AsyncResourceState[Nothing] @@ -108,13 +133,15 @@ private[services] object TrackerMap { /** A holder for an AutoCloseable that can be opened and closed async. * If closed before the underlying Future completes, will close the resource on completion. */ - final class AsyncResource[T <: AutoCloseable](future: Future[T]) { - private val logger = LoggerFactory.getLogger(this.getClass) + final class AsyncResource[T <: AutoCloseable]( + future: Future[T] + )(implicit val loggingContext: LoggingContext) { + private val logger = ContextualizedLogger.get(this.getClass) // Must progress Waiting => Ready => Closed or Waiting => Closed. val state: AtomicReference[AsyncResourceState[T]] = new AtomicReference(Waiting) - future.andThen({ + future.andThen { case Success(t) => if (!state.compareAndSet(Waiting, Ready(t))) { // This is the punch line of AsyncResource. @@ -128,18 +155,14 @@ private[services] object TrackerMap { case Failure(ex) => logger.error("failure to get async resource", ex) state.set(Closed) - })(DirectExecutionContext) + }(DirectExecutionContext) - def flatMap[U](f: T => Future[U])(implicit ex: ExecutionContext): Future[U] = { + def flatMap[U](f: T => Future[U])(implicit ex: ExecutionContext): Future[U] = state.get() match { case Waiting => future.flatMap(f) case Closed => throw new IllegalStateException() case Ready(t) => f(t) } - } - - def map[U](f: T => U)(implicit ex: ExecutionContext): Future[U] = - flatMap(t => Future.successful(f(t))) def ifPresent[U](f: T => U): Option[U] = state.get() match { case Ready(t) => Some(f(t)) @@ -152,6 +175,21 @@ private[services] object TrackerMap { } } - def apply(retentionPeriod: FiniteDuration)(implicit loggingContext: LoggingContext): TrackerMap = - new TrackerMap(retentionPeriod) + private final class TrackerWithLastSubmission(delegate: Tracker) extends Tracker { + @volatile private var lastSubmission = System.nanoTime() + + def getLastSubmission: Long = lastSubmission + + override def track( + request: SubmitAndWaitRequest + )(implicit + executionContext: ExecutionContext, + loggingContext: LoggingContext, + ): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = { + lastSubmission = System.nanoTime() + delegate.track(request) + } + + override def close(): Unit = delegate.close() + } } diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/TrackerMapSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/TrackerMapSpec.scala new file mode 100644 index 000000000000..0e946a9e36de --- /dev/null +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/TrackerMapSpec.scala @@ -0,0 +1,161 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.apiserver.services.tracking + +import java.util.concurrent.atomic.AtomicInteger + +import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest +import com.daml.ledger.api.v1.commands.Commands +import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{ + CompletionSuccess, + TrackedCompletionFailure, +} +import com.daml.logging.LoggingContext +import com.daml.platform.apiserver.services.tracking.TrackerMapSpec._ +import com.google.rpc.status.Status +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AsyncWordSpec + +import scala.collection.compat._ +import scala.collection.concurrent.TrieMap +import scala.concurrent.duration.{Duration, DurationInt} +import scala.concurrent.{ExecutionContext, Future} + +class TrackerMapSpec extends AsyncWordSpec with Matchers { + private implicit val ec: ExecutionContext = ExecutionContext.global + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting + + "the tracker map" should { + "delegate submissions to the constructed trackers" in { + val transactionIds = Seq("transaction A", "transaction B").iterator + val tracker = new TrackerMap[String]( + retentionPeriod = 1.minute, + getKey = commands => commands.commandId, + newTracker = _ => Future.successful(new FakeTracker(transactionIds)), + ) + + for { + completion1 <- tracker.track( + SubmitAndWaitRequest.of(commands = Some(Commands(commandId = "1", actAs = Seq("Alice")))) + ) + completion2 <- tracker.track( + SubmitAndWaitRequest.of(commands = Some(Commands(commandId = "2", actAs = Seq("Bob")))) + ) + } yield { + completion1 should matchPattern { case Right(CompletionSuccess("1", "transaction A", _)) => + } + completion2 should matchPattern { case Right(CompletionSuccess("2", "transaction B", _)) => + } + } + } + + "only construct one tracker per key" in { + val trackerCount = new AtomicInteger(0) + val tracker = new TrackerMap[Set[String]]( + retentionPeriod = 1.minute, + getKey = commands => commands.actAs.toSet, + newTracker = actAs => { + trackerCount.incrementAndGet() + val transactionIds = + (0 until Int.MaxValue).iterator.map(i => s"${actAs.mkString(", ")}: $i") + Future.successful(new FakeTracker(transactionIds)) + }, + ) + + for { + completion1 <- tracker.track( + SubmitAndWaitRequest.of( + commands = Some(Commands(commandId = "X", actAs = Seq("Alice", "Bob"))) + ) + ) + completion2 <- tracker.track( + SubmitAndWaitRequest.of( + commands = Some(Commands(commandId = "Y", actAs = Seq("Bob", "Alice"))) + ) + ) + } yield { + trackerCount.get() should be(1) + completion1 should matchPattern { case Right(CompletionSuccess("X", "Alice, Bob: 0", _)) => + } + completion2 should matchPattern { case Right(CompletionSuccess("Y", "Alice, Bob: 1", _)) => + } + } + } + + "only construct one tracker per key, even under heavy contention" in { + val requestCount = 1000 + val expectedTrackerCount = 10 + val actualTrackerCount = new AtomicInteger(0) + val tracker = new TrackerMap[String]( + retentionPeriod = 1.minute, + getKey = commands => commands.applicationId, + newTracker = _ => { + actualTrackerCount.incrementAndGet() + Future.successful(new FakeTracker(transactionIds = Iterator.continually(""))) + }, + ) + + val requests = (0 until requestCount).map { i => + val key = (i % expectedTrackerCount).toString + SubmitAndWaitRequest.of( + commands = Some(Commands(commandId = i.toString, applicationId = key)) + ) + } + Future.sequence(requests.map(tracker.track)).map { completions => + actualTrackerCount.get() should be(expectedTrackerCount) + all(completions) should matchPattern { case Right(_) => } + } + } + + "clean up old trackers" in { + val trackerCounts = TrieMap.empty[Set[String], AtomicInteger] + val tracker = new TrackerMap[Set[String]]( + retentionPeriod = Duration.Zero, + getKey = commands => commands.actAs.toSet, + newTracker = actAs => { + trackerCounts.getOrElseUpdate(actAs, new AtomicInteger(0)).incrementAndGet() + Future.successful(new FakeTracker(transactionIds = Iterator.continually(""))) + }, + ) + + for { + _ <- tracker.track( + SubmitAndWaitRequest.of(commands = Some(Commands(commandId = "1", actAs = Seq("Alice")))) + ) + _ <- tracker.track( + SubmitAndWaitRequest.of(commands = Some(Commands(commandId = "2", actAs = Seq("Bob")))) + ) + _ = tracker.cleanup() + _ <- tracker.track( + SubmitAndWaitRequest.of(commands = Some(Commands(commandId = "3", actAs = Seq("Bob")))) + ) + } yield { + val finalTrackerCounts = trackerCounts.view.mapValues(_.get()).toMap + finalTrackerCounts should be(Map(Set("Alice") -> 1, Set("Bob") -> 2)) + } + } + } +} + +object TrackerMapSpec { + final class FakeTracker(transactionIds: Iterator[String]) extends Tracker { + override def track( + request: SubmitAndWaitRequest + )(implicit + executionContext: ExecutionContext, + loggingContext: LoggingContext, + ): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = + Future.successful( + Right( + CompletionSuccess( + commandId = request.getCommands.commandId, + transactionId = transactionIds.next(), + originalStatus = Status.defaultInstance, + ) + ) + ) + + override def close(): Unit = () + } +}