Skip to content

Commit

Permalink
participant-integration-api: Ensure that all waiting, failed, and clo…
Browse files Browse the repository at this point in the history
…sed trackers are cleaned up. (#10662)

* participant-integration-api: Clean up failed trackers.

Otherwise they can hang around forever.

CHANGELOG_BEGIN
CHANGELOG_END

* participant-integration-api: Ensure that all trackers are closed.

* participant-integration-api: Ensure that waiting trackers are closed.

* participant-integration-api: Store the tracker map future in the state.

* participant-integration-api: Fix a race in `TrackerMap`.

If the supplied `Future` to `AsyncResourceState` is too fast, the state
may not be set. We need to initialize the state immediately.

* participant-integration-api: Add more comments to `TrackerMap`.
  • Loading branch information
SamirTalwar authored Aug 25, 2021
1 parent b27cde6 commit 53be19f
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.daml.platform.apiserver.services.tracking

import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicReference

import akka.stream.Materializer
Expand All @@ -18,11 +19,16 @@ import com.daml.platform.apiserver.services.tracking.TrackerMap._

import scala.collection.immutable.HashMap
import scala.concurrent.duration.{DurationLong, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success}

/** A map for [[Tracker]]s with thread-safe tracking methods and automatic cleanup. A tracker tracker, if you will.
* @param retentionPeriod The minimum finite duration for which to retain idle trackers.
/** A map for [[Tracker]]s with thread-safe tracking methods and automatic cleanup.
* A tracker tracker, if you will.
*
* @param retentionPeriod The minimum duration for which to retain ready-but-idling trackers.
* @param getKey A function to compute the tracker key from the commands.
* @param newTracker A function to construct a new tracker.
* Called when there is no tracker for the given key.
*/
private[services] final class TrackerMap[Key](
retentionPeriod: FiniteDuration,
Expand All @@ -41,7 +47,7 @@ private[services] final class TrackerMap[Key](

require(
retentionPeriod < Long.MaxValue.nanoseconds,
s"Retention period$retentionPeriod is too long. Must be below ${Long.MaxValue} nanoseconds.",
s"Retention period $retentionPeriod is too long. Must be below ${Long.MaxValue} nanoseconds.",
)

private val retentionNanos = retentionPeriod.toNanos
Expand All @@ -59,32 +65,46 @@ private[services] final class TrackerMap[Key](
.getOrElse(
key,
lock.synchronized {
trackerBySubmitter.getOrElse(
key, {
val r = new TrackerMap.AsyncResource(newTracker(key).map { t =>
logger.info(s"Registered tracker for submitter $key")
new TrackerWithLastSubmission(t)
})
trackerBySubmitter += key -> r
r
},
)
trackerBySubmitter.getOrElse(key, registerNewTracker(key))
},
)
.flatMap(_.track(request))
.withResource(_.track(request))
}

private def registerNewTracker(
key: Key
)(implicit executionContext: ExecutionContext): AsyncResource[TrackerWithLastSubmission] = {
val resource = new AsyncResource(
newTracker(key)
.andThen {
case Success(_) =>
logger.info(s"Registered a tracker for submitter $key.")
case Failure(exception) =>
logger.error("Failed to register a tracker.", exception)
}
.map(new TrackerWithLastSubmission(_))
)
trackerBySubmitter += key -> resource
resource
}

def cleanup(): Unit = lock.synchronized {
val nanoTime = System.nanoTime()
trackerBySubmitter foreach { case (submitter, trackerResource) =>
trackerResource.ifPresent { tracker =>
if (nanoTime - tracker.getLastSubmission > retentionNanos) {
logger.info(
s"Shutting down tracker for $submitter after inactivity of $retentionPeriod"
)(trackerResource.loggingContext)
val currentTime = System.nanoTime()
trackerBySubmitter.foreach { case (submitter, trackerResource) =>
trackerResource.currentState match {
case Waiting => // there is nothing to clean up
case Ready(tracker) =>
// close and forget expired trackers
if (currentTime - tracker.getLastSubmission > retentionNanos) {
logger.info(
s"Shutting down tracker for $submitter after inactivity of $retentionPeriod"
)(trackerResource.loggingContext)
tracker.close()
trackerBySubmitter -= submitter
}
case Failed(_) | Closed =>
// simply forget already-failed or closed trackers
trackerBySubmitter -= submitter
tracker.close()
}
}
}
}
Expand Down Expand Up @@ -125,53 +145,57 @@ private[services] object TrackerMap {
}
}

sealed trait AsyncResourceState[+T <: AutoCloseable]
final case object Waiting extends AsyncResourceState[Nothing]
final case object Closed extends AsyncResourceState[Nothing]
final case class Ready[T <: AutoCloseable](t: T) extends AsyncResourceState[T]
private sealed trait AsyncResourceState[+T <: AutoCloseable]
private final case object Waiting extends AsyncResourceState[Nothing]
private final case class Ready[T <: AutoCloseable](resource: T) extends AsyncResourceState[T]
private final case object Closed extends AsyncResourceState[Nothing]
private final case class Failed(exception: Throwable) extends AsyncResourceState[Nothing]

/** A holder for an AutoCloseable that can be opened and closed async.
* If closed before the underlying Future completes, will close the resource on completion.
*/
final class AsyncResource[T <: AutoCloseable](
future: Future[T]
start: Future[T]
)(implicit val loggingContext: LoggingContext) {
private val logger = ContextualizedLogger.get(this.getClass)

// Must progress Waiting => Ready => Closed or Waiting => Closed.
val state: AtomicReference[AsyncResourceState[T]] = new AtomicReference(Waiting)

future.andThen {
case Success(t) =>
if (!state.compareAndSet(Waiting, Ready(t))) {
// This is the punch line of AsyncResource.
// If we've been closed in the meantime, we must close the underlying resource also.
// This "on-failure-to-complete" behavior is not present in scala or java Futures.
t.close()
}
// Someone should be listening to this failure downstream
// TODO(mthvedt): Refactor so at least one downstream listener is always present,
// and exceptions are never dropped.
case Failure(ex) =>
logger.error("failure to get async resource", ex)
state.set(Closed)
// Must progress as follows:
// - Waiting -> Closed,
// - Waiting -> Ready -> Closed, or
// - Waiting -> Failed.
private val state: AtomicReference[AsyncResourceState[T]] = new AtomicReference(Waiting)
private val future = start.andThen {
case Success(resource) =>
state.set(Ready(resource))
case Failure(exception) =>
state.set(Failed(exception))
}(DirectExecutionContext)

def flatMap[U](f: T => Future[U])(implicit ex: ExecutionContext): Future[U] =
state.get() match {
case Waiting => future.flatMap(f)
case Closed => throw new IllegalStateException()
case Ready(t) => f(t)
}
private[TrackerMap] def currentState: AsyncResourceState[T] = state.get()

def ifPresent[U](f: T => U): Option[U] = state.get() match {
case Ready(t) => Some(f(t))
case _ => None
}
// This will recurse in the `Waiting` state, but only after `future` completes,
// which means that the state will have changed to either `Ready` or `Failed`.
def withResource[U](f: T => Future[U])(implicit ex: ExecutionContext): Future[U] =
currentState match {
case Waiting => future.flatMap(_ => withResource(f)) // try again
case Ready(resource) => f(resource)
case Failed(exception) => Future.failed(exception)
case Closed => Future.failed(new IllegalStateException("The resource is closed."))
}

def close(): Unit = state.getAndSet(Closed) match {
case Ready(t) => t.close()
case _ =>
case Waiting =>
try {
Await.result(
future.transform(Success(_))(DirectExecutionContext),
10.seconds,
) match {
case Success(resource) => resource.close()
case Failure(_) =>
}
} catch {
case _: InterruptedException | _: TimeoutException => // don't worry about it
}
case Ready(resource) => resource.close()
case Failed(_) | Closed =>
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.daml.platform.apiserver.services.tracking

import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}

import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.daml.ledger.api.v1.commands.Commands
Expand All @@ -13,6 +13,7 @@ import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
}
import com.daml.logging.LoggingContext
import com.daml.platform.apiserver.services.tracking.TrackerMapSpec._
import com.daml.timer.Delayed
import com.google.rpc.status.Status
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
Expand Down Expand Up @@ -135,6 +136,143 @@ class TrackerMapSpec extends AsyncWordSpec with Matchers {
finalTrackerCounts should be(Map(Set("Alice") -> 1, Set("Bob") -> 2))
}
}

"clean up failed trackers" in {
val trackerCounts = TrieMap.empty[String, AtomicInteger]
val tracker = new TrackerMap[String](
retentionPeriod = 1.minute,
getKey = commands => commands.applicationId,
newTracker = applicationId => {
trackerCounts.getOrElseUpdate(applicationId, new AtomicInteger(0)).incrementAndGet()
if (applicationId.isEmpty)
Future.failed(new IllegalArgumentException("Missing application ID."))
else
Future.successful(new FakeTracker(transactionIds = Iterator.continually("")))
},
)

for {
_ <- tracker.track(
SubmitAndWaitRequest.of(
commands = Some(Commands(commandId = "1", applicationId = "test"))
)
)
failure1 <- tracker
.track(
SubmitAndWaitRequest.of(commands = Some(Commands(commandId = "2", applicationId = "")))
)
.failed
_ = tracker.cleanup()
failure2 <- tracker
.track(
SubmitAndWaitRequest.of(commands = Some(Commands(commandId = "3", applicationId = "")))
)
.failed
} yield {
val finalTrackerCounts = trackerCounts.view.mapValues(_.get()).toMap
finalTrackerCounts should be(Map("test" -> 1, "" -> 2))
failure1.getMessage should be("Missing application ID.")
failure2.getMessage should be("Missing application ID.")
}
}

"close all trackers" in {
val requestCount = 20
val expectedTrackerCount = 5
val openTrackerCount = new AtomicInteger(0)
val closedTrackerCount = new AtomicInteger(0)
val tracker = new TrackerMap[String](
retentionPeriod = 1.minute,
getKey = commands => commands.applicationId,
newTracker = _ =>
Future.successful {
openTrackerCount.incrementAndGet()
new Tracker {
override def track(
request: SubmitAndWaitRequest
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[Either[TrackedCompletionFailure, CompletionSuccess]] =
Future.successful(
Right(
CompletionSuccess(
commandId = request.getCommands.commandId,
transactionId = "",
originalStatus = Status.defaultInstance,
)
)
)

override def close(): Unit = {
closedTrackerCount.incrementAndGet()
()
}
}
},
)

val requests = (0 until requestCount).map { i =>
val key = (i % expectedTrackerCount).toString
SubmitAndWaitRequest.of(
commands = Some(Commands(commandId = i.toString, applicationId = key))
)
}
for {
_ <- Future.sequence(requests.map(tracker.track))
_ = tracker.close()
} yield {
openTrackerCount.get() should be(expectedTrackerCount)
closedTrackerCount.get() should be(expectedTrackerCount)
}
}

"close waiting trackers" in {
val openTracker = new AtomicBoolean(false)
val closedTracker = new AtomicBoolean(false)
val tracker = new TrackerMap[Unit](
retentionPeriod = 1.minute,
getKey = _ => (),
newTracker = _ =>
Delayed.by(1.second) {
openTracker.set(true)
new Tracker {
override def track(
request: SubmitAndWaitRequest
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[Either[TrackedCompletionFailure, CompletionSuccess]] =
Future.successful(
Right(
CompletionSuccess(
commandId = request.getCommands.commandId,
transactionId = "",
originalStatus = Status.defaultInstance,
)
)
)

override def close(): Unit = {
closedTracker.set(true)
()
}
}
},
)

val completionF = tracker.track(
SubmitAndWaitRequest.of(
commands = Some(Commands(commandId = "command"))
)
)
tracker.close()
Delayed.Future.by(1.second)(completionF).map { completion =>
openTracker.get() should be(true)
closedTracker.get() should be(true)
completion should matchPattern { case Right(_) => }
}
}
}
}

Expand Down

0 comments on commit 53be19f

Please sign in to comment.