Skip to content

Commit

Permalink
participant-integration-api: Accommodate changes to max dedup time. (#…
Browse files Browse the repository at this point in the history
…10650)

Previously, if the max deduplication time was extended, the participant
_might_ retain the old time for certain submissions.

CHANGELOG_BEGIN
- [Ledger API Server] The API server manages a single command tracker
  per (application ID × submitters) pair. This tracker would read the
  current ledger configuration's maximum deduplication time on creation,
  but never updated it, leading to trackers that might inadvertently
  reject a submission when it should have been accepted. The tracker now
  reads the latest ledger configuration.
CHANGELOG_END
  • Loading branch information
SamirTalwar authored Aug 24, 2021
1 parent 29c546c commit a00608c
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ final class CommandClient(
CommandSubmissionFlow[(Context, String)](submit(token), config.maxParallelSubmissions),
offset => completionSource(parties, offset, token),
ledgerEnd.getOffset,
() => config.defaultDeduplicationTime,
() => Some(config.defaultDeduplicationTime),
)
)(Keep.right)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object CommandTrackerFlow {
]], SubmissionMat],
createCommandCompletionSource: LedgerOffset => Source[CompletionStreamElement, NotUsed],
startingOffset: LedgerOffset,
maxDeduplicationTime: () => JDuration,
maxDeduplicationTime: () => Option[JDuration],
backOffDuration: FiniteDuration = 1.second,
): Flow[Ctx[Context, SubmitRequest], Ctx[
Context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
CompletionFailure,
CompletionSuccess,
NotOkResponse,
}
import com.daml.ledger.client.services.commands.{CompletionStreamElement, tracker}
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.util.Ctx
import com.google.protobuf.duration.{Duration => ProtoDuration}
import com.google.protobuf.empty.Empty
import com.google.rpc.code._
import com.google.rpc.status.Status
import io.grpc.{Status => RpcStatus}
import com.google.rpc.status.{Status => StatusProto}
import io.grpc.Status
import org.slf4j.LoggerFactory

import scala.collection.compat._
import scala.collection.{immutable, mutable}
import scala.concurrent.duration._
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Future, Promise}
import scala.util.control.NoStackTrace
import scala.util.{Failure, Success, Try}
Expand All @@ -49,7 +49,7 @@ import scala.util.{Failure, Success, Try}
* We also have an output for offsets, so the most recent offsets can be reused for recovery.
*/
// TODO(mthvedt): This should have unit tests.
private[commands] class CommandTracker[Context](maxDeduplicationTime: () => JDuration)
private[commands] class CommandTracker[Context](maxDeduplicationTime: () => Option[JDuration])
extends GraphStageWithMaterializedValue[CommandTrackerShape[Context], Future[
immutable.Map[String, Context]
]] {
Expand Down Expand Up @@ -208,13 +208,13 @@ private[commands] class CommandTracker[Context](maxDeduplicationTime: () => JDur
}
}

private def registerSubmission(submitRequest: Ctx[Context, SubmitRequest]): Unit = {
submitRequest.value.commands
.fold(
private def registerSubmission(submitRequest: Ctx[Context, SubmitRequest]): Unit =
submitRequest.value.commands match {
case None =>
throw new IllegalArgumentException(
"Commands field is missing from received SubmitRequest in CommandTracker"
) with NoStackTrace
) { commands =>
case Some(commands) =>
val commandId = commands.commandId
logger.trace("Begin tracking of command {}", commandId)
if (pendingCommands.contains(commandId)) {
Expand All @@ -223,23 +223,28 @@ private[commands] class CommandTracker[Context](maxDeduplicationTime: () => JDur
s"A command with id $commandId is already being tracked. CommandIds submitted to the CommandTracker must be unique."
) with NoStackTrace
}
val commandTimeout = {
lazy val maxDedup = maxDeduplicationTime()
val dedup = commands.deduplicationTime.getOrElse(
ProtoDuration.of(maxDedup.getSeconds, maxDedup.getNano)
)
Instant.now().plusSeconds(dedup.seconds).plusNanos(dedup.nanos.toLong)
maxDeduplicationTime() match {
case None =>
emit(
resultOut,
submitRequest.map { _ =>
val status = GrpcStatus.toProto(ErrorFactories.missingLedgerConfig().getStatus)
Left(NotOkResponse(commandId, status))
},
)
case Some(maxDedup) =>
val deduplicationDuration = commands.deduplicationTime
.map(d => JDuration.ofSeconds(d.seconds, d.nanos.toLong))
.getOrElse(maxDedup)
val trackingData = TrackingData(
commandId = commandId,
commandTimeout = Instant.now().plus(deduplicationDuration),
context = submitRequest.context,
)
pendingCommands += commandId -> trackingData
}

pendingCommands += (commandId ->
TrackingData(
commandId,
commandTimeout,
submitRequest.context,
))
}
()
}
()
}

private def getOutputForTimeout(instant: Instant) = {
logger.trace("Checking timeouts at {}", instant)
Expand Down Expand Up @@ -273,7 +278,7 @@ private[commands] class CommandTracker[Context](maxDeduplicationTime: () => JDur
private def getOutputForCompletion(completion: Completion) = {
val (commandId, errorText) = {
completion.status match {
case Some(status) if Code.fromValue(status.code) == Code.OK =>
case Some(StatusProto(code, _, _, _)) if code == Status.Code.OK.value =>
completion.commandId -> "successful completion of command"
case _ =>
completion.commandId -> "failed completion of command"
Expand All @@ -288,7 +293,7 @@ private[commands] class CommandTracker[Context](maxDeduplicationTime: () => JDur

private def getOutputForTerminalStatusCode(
commandId: String,
status: Status,
status: StatusProto,
): Option[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] = {
logger.trace("Handling failure of command {}", commandId)
pendingCommands
Expand Down Expand Up @@ -320,5 +325,5 @@ private[commands] class CommandTracker[Context](maxDeduplicationTime: () => JDur

object CommandTracker {
private val nonTerminalCodes =
Set(RpcStatus.Code.UNKNOWN, RpcStatus.Code.INTERNAL, RpcStatus.Code.OK)
Set(Status.Code.UNKNOWN, Status.Code.INTERNAL, Status.Code.OK)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import java.time.{Instant, Duration => JDuration}
import java.util.concurrent.atomic.AtomicReference

import akka.NotUsed
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, Keep, Source, SourceQueueWithComplete}
import akka.stream.testkit.javadsl.TestSink
import akka.stream.testkit.scaladsl.TestSource
import akka.stream.testkit.{TestPublisher, TestSubscriber}
import akka.stream.{OverflowStrategy, QueueOfferResult}
import com.daml.api.util.TimestampConversion._
import com.daml.dec.DirectExecutionContext
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
Expand Down Expand Up @@ -39,7 +39,7 @@ import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec

import scala.concurrent.duration._
import scala.concurrent.duration.DurationLong
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -115,26 +115,45 @@ class CommandTrackerFlowTest
completionSource
}

def send(elem: CompletionStreamElement) =
def send(elem: CompletionStreamElement): Future[QueueOfferResult] =
for {
state <- stateRef.get().future
res <- state.queue.offer(elem)
} yield (res)
} yield res

def breakCompletionsStream(): Future[Unit] =
stateRef
.getAndSet(Promise[State]())
.future
.map(state => state.queue.fail(new RuntimeException("boom")))

def getLastOffset = stateRef.get().future.map(_.startOffset)
def getLastOffset: Future[LedgerOffset] =
stateRef.get().future.map(_.startOffset)

}

import Compat._

"Command tracking flow" when {

"no configuration is available" should {
"fail immediately" in {
val Handle(submissions, results, _, _) =
runCommandTrackingFlow(allSubmissionsSuccessful, maxDeduplicationTime = None)

submissions.sendNext(submitRequest)

results.requestNext().value shouldEqual Left(
NotOkResponse(
commandId,
Status.of(
Code.UNAVAILABLE.value,
"The ledger configuration is not available.",
Seq.empty,
),
)
)
}
}

"two commands are submitted with the same ID" should {

"fail the stream" in {
Expand Down Expand Up @@ -523,7 +542,8 @@ class CommandTrackerFlowTest
Ctx[(Int, String), SubmitRequest],
Ctx[(Int, String), Try[Empty]],
NotUsed,
]
],
maxDeduplicationTime: Option[JDuration] = Some(JDuration.ofSeconds(10)),
) = {

val completionsMock = new CompletionStreamMock()
Expand All @@ -533,7 +553,7 @@ class CommandTrackerFlowTest
submissionFlow,
completionsMock.createCompletionsSource,
LedgerOffset(Boundary(LEDGER_BEGIN)),
() => JDuration.ofSeconds(10),
() => maxDeduplicationTime,
)

val handle = submissionSource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
TrackedCompletionFailure,
}
import com.daml.ledger.client.services.commands.{CommandCompletionSource, CommandTrackerFlow}
import com.daml.ledger.configuration.{Configuration => LedgerConfiguration}
import com.daml.logging.LoggingContext.withEnrichedLoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
Expand All @@ -43,7 +42,6 @@ import com.daml.platform.apiserver.services.ApiCommandService._
import com.daml.platform.apiserver.services.tracking.{TrackerImpl, TrackerMap}
import com.daml.platform.server.api.ApiException
import com.daml.platform.server.api.services.grpc.GrpcCommandService
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.util.Ctx
import com.daml.util.akkastreams.MaxInFlight
import com.google.protobuf.empty.Empty
Expand Down Expand Up @@ -93,11 +91,7 @@ private[apiserver] final class ApiCommandService private (
logging.readAsStrings(request.getCommands.readAs),
) { implicit loggingContext =>
if (running) {
ledgerConfigurationSubscription
.latestConfiguration()
.fold[Future[Either[TrackedCompletionFailure, CompletionSuccess]]](
Future.failed(ErrorFactories.missingLedgerConfig())
)(ledgerConfig => track(request, ledgerConfig))
track(request)
} else {
Future.failed(
new ApiException(Status.UNAVAILABLE.withDescription("Service has been shut down."))
Expand All @@ -106,8 +100,7 @@ private[apiserver] final class ApiCommandService private (
}

private def track(
request: SubmitAndWaitRequest,
ledgerConfig: LedgerConfiguration,
request: SubmitAndWaitRequest
)(implicit
loggingContext: LoggingContext
): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = {
Expand Down Expand Up @@ -137,7 +130,7 @@ private[apiserver] final class ApiCommandService private (
)
.mapConcat(CommandCompletionSource.toStreamElements),
ledgerEnd,
() => ledgerConfig.maxDeduplicationTime,
() => ledgerConfigurationSubscription.latestConfiguration().map(_.maxDeduplicationTime),
)
val trackingFlow = MaxInFlight(
configuration.maxCommandsInFlight,
Expand Down

0 comments on commit a00608c

Please sign in to comment.