From a00608c8b36dfa212c5b5ac32f23e648e5ea05eb Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Tue, 24 Aug 2021 11:28:18 +0200 Subject: [PATCH] participant-integration-api: Accommodate changes to max dedup time. (#10650) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, if the max deduplication time was extended, the participant _might_ retain the old time for certain submissions. CHANGELOG_BEGIN - [Ledger API Server] The API server manages a single command tracker per (application ID × submitters) pair. This tracker would read the current ledger configuration's maximum deduplication time on creation, but never updated it, leading to trackers that might inadvertently reject a submission when it should have been accepted. The tracker now reads the latest ledger configuration. CHANGELOG_END --- .../services/commands/CommandClient.scala | 2 +- .../commands/CommandTrackerFlow.scala | 2 +- .../commands/tracker/CommandTracker.scala | 63 ++++++++++--------- .../commands/CommandTrackerFlowTest.scala | 38 ++++++++--- .../services/ApiCommandService.scala | 13 +--- 5 files changed, 68 insertions(+), 50 deletions(-) diff --git a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/CommandClient.scala b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/CommandClient.scala index 5a047336d2c2..9d16d151197f 100644 --- a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/CommandClient.scala +++ b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/CommandClient.scala @@ -136,7 +136,7 @@ final class CommandClient( CommandSubmissionFlow[(Context, String)](submit(token), config.maxParallelSubmissions), offset => completionSource(parties, offset, token), ledgerEnd.getOffset, - () => config.defaultDeduplicationTime, + () => Some(config.defaultDeduplicationTime), ) )(Keep.right) } diff --git a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlow.scala b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlow.scala index a0b1b21002e8..bfe50c2eeb8b 100644 --- a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlow.scala +++ b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlow.scala @@ -42,7 +42,7 @@ object CommandTrackerFlow { ]], SubmissionMat], createCommandCompletionSource: LedgerOffset => Source[CompletionStreamElement, NotUsed], startingOffset: LedgerOffset, - maxDeduplicationTime: () => JDuration, + maxDeduplicationTime: () => Option[JDuration], backOffDuration: FiniteDuration = 1.second, ): Flow[Ctx[Context, SubmitRequest], Ctx[ Context, diff --git a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala index 238640122454..26cafbcacc96 100644 --- a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala +++ b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CommandTracker.scala @@ -14,19 +14,19 @@ import com.daml.ledger.api.v1.ledger_offset.LedgerOffset import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{ CompletionFailure, CompletionSuccess, + NotOkResponse, } import com.daml.ledger.client.services.commands.{CompletionStreamElement, tracker} +import com.daml.platform.server.api.validation.ErrorFactories import com.daml.util.Ctx -import com.google.protobuf.duration.{Duration => ProtoDuration} import com.google.protobuf.empty.Empty -import com.google.rpc.code._ -import com.google.rpc.status.Status -import io.grpc.{Status => RpcStatus} +import com.google.rpc.status.{Status => StatusProto} +import io.grpc.Status import org.slf4j.LoggerFactory import scala.collection.compat._ import scala.collection.{immutable, mutable} -import scala.concurrent.duration._ +import scala.concurrent.duration.DurationInt import scala.concurrent.{Future, Promise} import scala.util.control.NoStackTrace import scala.util.{Failure, Success, Try} @@ -49,7 +49,7 @@ import scala.util.{Failure, Success, Try} * We also have an output for offsets, so the most recent offsets can be reused for recovery. */ // TODO(mthvedt): This should have unit tests. -private[commands] class CommandTracker[Context](maxDeduplicationTime: () => JDuration) +private[commands] class CommandTracker[Context](maxDeduplicationTime: () => Option[JDuration]) extends GraphStageWithMaterializedValue[CommandTrackerShape[Context], Future[ immutable.Map[String, Context] ]] { @@ -208,13 +208,13 @@ private[commands] class CommandTracker[Context](maxDeduplicationTime: () => JDur } } - private def registerSubmission(submitRequest: Ctx[Context, SubmitRequest]): Unit = { - submitRequest.value.commands - .fold( + private def registerSubmission(submitRequest: Ctx[Context, SubmitRequest]): Unit = + submitRequest.value.commands match { + case None => throw new IllegalArgumentException( "Commands field is missing from received SubmitRequest in CommandTracker" ) with NoStackTrace - ) { commands => + case Some(commands) => val commandId = commands.commandId logger.trace("Begin tracking of command {}", commandId) if (pendingCommands.contains(commandId)) { @@ -223,23 +223,28 @@ private[commands] class CommandTracker[Context](maxDeduplicationTime: () => JDur s"A command with id $commandId is already being tracked. CommandIds submitted to the CommandTracker must be unique." ) with NoStackTrace } - val commandTimeout = { - lazy val maxDedup = maxDeduplicationTime() - val dedup = commands.deduplicationTime.getOrElse( - ProtoDuration.of(maxDedup.getSeconds, maxDedup.getNano) - ) - Instant.now().plusSeconds(dedup.seconds).plusNanos(dedup.nanos.toLong) + maxDeduplicationTime() match { + case None => + emit( + resultOut, + submitRequest.map { _ => + val status = GrpcStatus.toProto(ErrorFactories.missingLedgerConfig().getStatus) + Left(NotOkResponse(commandId, status)) + }, + ) + case Some(maxDedup) => + val deduplicationDuration = commands.deduplicationTime + .map(d => JDuration.ofSeconds(d.seconds, d.nanos.toLong)) + .getOrElse(maxDedup) + val trackingData = TrackingData( + commandId = commandId, + commandTimeout = Instant.now().plus(deduplicationDuration), + context = submitRequest.context, + ) + pendingCommands += commandId -> trackingData } - - pendingCommands += (commandId -> - TrackingData( - commandId, - commandTimeout, - submitRequest.context, - )) - } - () - } + () + } private def getOutputForTimeout(instant: Instant) = { logger.trace("Checking timeouts at {}", instant) @@ -273,7 +278,7 @@ private[commands] class CommandTracker[Context](maxDeduplicationTime: () => JDur private def getOutputForCompletion(completion: Completion) = { val (commandId, errorText) = { completion.status match { - case Some(status) if Code.fromValue(status.code) == Code.OK => + case Some(StatusProto(code, _, _, _)) if code == Status.Code.OK.value => completion.commandId -> "successful completion of command" case _ => completion.commandId -> "failed completion of command" @@ -288,7 +293,7 @@ private[commands] class CommandTracker[Context](maxDeduplicationTime: () => JDur private def getOutputForTerminalStatusCode( commandId: String, - status: Status, + status: StatusProto, ): Option[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] = { logger.trace("Handling failure of command {}", commandId) pendingCommands @@ -320,5 +325,5 @@ private[commands] class CommandTracker[Context](maxDeduplicationTime: () => JDur object CommandTracker { private val nonTerminalCodes = - Set(RpcStatus.Code.UNKNOWN, RpcStatus.Code.INTERNAL, RpcStatus.Code.OK) + Set(Status.Code.UNKNOWN, Status.Code.INTERNAL, Status.Code.OK) } diff --git a/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala b/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala index 9c9984178102..6c39dd13065e 100644 --- a/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala +++ b/ledger/ledger-api-client/src/test/suite/scala/com/digitalasset/ledger/client/services/commands/CommandTrackerFlowTest.scala @@ -7,11 +7,11 @@ import java.time.{Instant, Duration => JDuration} import java.util.concurrent.atomic.AtomicReference import akka.NotUsed -import akka.stream.OverflowStrategy import akka.stream.scaladsl.{Flow, Keep, Source, SourceQueueWithComplete} import akka.stream.testkit.javadsl.TestSink import akka.stream.testkit.scaladsl.TestSource import akka.stream.testkit.{TestPublisher, TestSubscriber} +import akka.stream.{OverflowStrategy, QueueOfferResult} import com.daml.api.util.TimestampConversion._ import com.daml.dec.DirectExecutionContext import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll @@ -39,7 +39,7 @@ import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AsyncWordSpec -import scala.concurrent.duration._ +import scala.concurrent.duration.DurationLong import scala.concurrent.{Future, Promise} import scala.util.{Failure, Success, Try} @@ -115,11 +115,11 @@ class CommandTrackerFlowTest completionSource } - def send(elem: CompletionStreamElement) = + def send(elem: CompletionStreamElement): Future[QueueOfferResult] = for { state <- stateRef.get().future res <- state.queue.offer(elem) - } yield (res) + } yield res def breakCompletionsStream(): Future[Unit] = stateRef @@ -127,14 +127,33 @@ class CommandTrackerFlowTest .future .map(state => state.queue.fail(new RuntimeException("boom"))) - def getLastOffset = stateRef.get().future.map(_.startOffset) + def getLastOffset: Future[LedgerOffset] = + stateRef.get().future.map(_.startOffset) } - import Compat._ - "Command tracking flow" when { + "no configuration is available" should { + "fail immediately" in { + val Handle(submissions, results, _, _) = + runCommandTrackingFlow(allSubmissionsSuccessful, maxDeduplicationTime = None) + + submissions.sendNext(submitRequest) + + results.requestNext().value shouldEqual Left( + NotOkResponse( + commandId, + Status.of( + Code.UNAVAILABLE.value, + "The ledger configuration is not available.", + Seq.empty, + ), + ) + ) + } + } + "two commands are submitted with the same ID" should { "fail the stream" in { @@ -523,7 +542,8 @@ class CommandTrackerFlowTest Ctx[(Int, String), SubmitRequest], Ctx[(Int, String), Try[Empty]], NotUsed, - ] + ], + maxDeduplicationTime: Option[JDuration] = Some(JDuration.ofSeconds(10)), ) = { val completionsMock = new CompletionStreamMock() @@ -533,7 +553,7 @@ class CommandTrackerFlowTest submissionFlow, completionsMock.createCompletionsSource, LedgerOffset(Boundary(LEDGER_BEGIN)), - () => JDuration.ofSeconds(10), + () => maxDeduplicationTime, ) val handle = submissionSource diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala index 23571e85ccd8..db2b6b66e4fe 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala @@ -33,7 +33,6 @@ import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{ TrackedCompletionFailure, } import com.daml.ledger.client.services.commands.{CommandCompletionSource, CommandTrackerFlow} -import com.daml.ledger.configuration.{Configuration => LedgerConfiguration} import com.daml.logging.LoggingContext.withEnrichedLoggingContext import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.Metrics @@ -43,7 +42,6 @@ import com.daml.platform.apiserver.services.ApiCommandService._ import com.daml.platform.apiserver.services.tracking.{TrackerImpl, TrackerMap} import com.daml.platform.server.api.ApiException import com.daml.platform.server.api.services.grpc.GrpcCommandService -import com.daml.platform.server.api.validation.ErrorFactories import com.daml.util.Ctx import com.daml.util.akkastreams.MaxInFlight import com.google.protobuf.empty.Empty @@ -93,11 +91,7 @@ private[apiserver] final class ApiCommandService private ( logging.readAsStrings(request.getCommands.readAs), ) { implicit loggingContext => if (running) { - ledgerConfigurationSubscription - .latestConfiguration() - .fold[Future[Either[TrackedCompletionFailure, CompletionSuccess]]]( - Future.failed(ErrorFactories.missingLedgerConfig()) - )(ledgerConfig => track(request, ledgerConfig)) + track(request) } else { Future.failed( new ApiException(Status.UNAVAILABLE.withDescription("Service has been shut down.")) @@ -106,8 +100,7 @@ private[apiserver] final class ApiCommandService private ( } private def track( - request: SubmitAndWaitRequest, - ledgerConfig: LedgerConfiguration, + request: SubmitAndWaitRequest )(implicit loggingContext: LoggingContext ): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = { @@ -137,7 +130,7 @@ private[apiserver] final class ApiCommandService private ( ) .mapConcat(CommandCompletionSource.toStreamElements), ledgerEnd, - () => ledgerConfig.maxDeduplicationTime, + () => ledgerConfigurationSubscription.latestConfiguration().map(_.maxDeduplicationTime), ) val trackingFlow = MaxInFlight( configuration.maxCommandsInFlight,