Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Fixes #1497 - Do not query app versions in MarathonHealthCheckManager…
Browse files Browse the repository at this point in the history
….statuses

and make MarathonHealthCheckManager data structures more efficient
  • Loading branch information
Peter Kolloch committed May 27, 2015
1 parent 3754efb commit 4279b6b
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@ class MarathonHealthCheckManager @Inject() (
taskTracker: TaskTracker,
appRepository: AppRepository) extends HealthCheckManager {

// composite key for partitioning the set of active health checks
protected[this] case class AppVersion(id: PathId, version: Timestamp)

protected[this] case class ActiveHealthCheck(
healthCheck: HealthCheck,
actor: ActorRef)

protected[this] var appHealthChecks = Map[AppVersion, Set[ActiveHealthCheck]]()
protected[this] var appHealthChecks: Map[PathId, Map[Timestamp, Set[ActiveHealthCheck]]] =
Map.empty.withDefaultValue(Map.empty.withDefaultValue(Set.empty))

private[this] val rwLock = new ReentrantReadWriteLock
private[this] val readLock = rwLock.readLock
Expand All @@ -58,11 +56,13 @@ class MarathonHealthCheckManager @Inject() (
withReadLock { listActive(appId).map(_.healthCheck) }

protected[this] def listActive(appId: PathId): Set[ActiveHealthCheck] =
withReadLock { appHealthChecks.withFilter(_._1.id == appId).flatMap(_._2).toSet }
withReadLock {
appHealthChecks(appId).values.flatten.toSet
}

protected[this] def listActive(appId: PathId, appVersion: Timestamp): Set[ActiveHealthCheck] =
withReadLock {
appHealthChecks.getOrElse(AppVersion(appId, appVersion), Set.empty)
appHealthChecks(appId)(appVersion)
}

override def add(appId: PathId, appVersion: Timestamp, healthCheck: HealthCheck): Unit =
Expand All @@ -79,7 +79,10 @@ class MarathonHealthCheckManager @Inject() (
appId, appVersion.toString, driverHolder, scheduler, healthCheck, taskTracker, eventBus))
val newHealthChecksForApp =
healthChecksForApp + ActiveHealthCheck(healthCheck, ref)
appHealthChecks += (AppVersion(appId, appVersion) -> newHealthChecksForApp)

val appMap = appHealthChecks(appId) + (appVersion -> newHealthChecksForApp)
appHealthChecks += appId -> appMap

eventBus.publish(AddHealthCheck(appId, appVersion, healthCheck))
}
}
Expand All @@ -89,27 +92,37 @@ class MarathonHealthCheckManager @Inject() (

override def remove(appId: PathId, appVersion: Timestamp, healthCheck: HealthCheck): Unit =
withWriteLock {
val healthChecksForApp: Set[ActiveHealthCheck] = listActive(appId, appVersion)
val toRemove: Set[ActiveHealthCheck] = healthChecksForApp.filter(_.healthCheck == healthCheck)
val healthChecksForVersion: Set[ActiveHealthCheck] = listActive(appId, appVersion)
val toRemove: Set[ActiveHealthCheck] = healthChecksForVersion.filter(_.healthCheck == healthCheck)
for (ahc <- toRemove) {
log.info(s"Removing health check for app [$appId] and version [$appVersion]: [$healthCheck]")
deactivate(ahc)
eventBus.publish(RemoveHealthCheck(appId))
}
val newHealthChecksForApp = healthChecksForApp -- toRemove
val newHealthChecksForVersion = healthChecksForVersion -- toRemove
val currentHealthChecksForApp = appHealthChecks(appId)
val newHealthChecksForApp = if (newHealthChecksForVersion.isEmpty) {
currentHealthChecksForApp - appVersion
}
else {
currentHealthChecksForApp + (appVersion -> newHealthChecksForVersion)
}

appHealthChecks =
if (newHealthChecksForApp.isEmpty) appHealthChecks - AppVersion(appId, appVersion)
else appHealthChecks + (AppVersion(appId, appVersion) -> newHealthChecksForApp)
if (newHealthChecksForApp.isEmpty) appHealthChecks - appId
else appHealthChecks + (appId -> newHealthChecksForApp)
}

override def removeAll(): Unit =
withWriteLock { appHealthChecks.keys.map(_.id) foreach removeAllFor }
withWriteLock { appHealthChecks.keys foreach removeAllFor }

override def removeAllFor(appId: PathId): Unit =
withWriteLock {
appHealthChecks.foreach { mapping =>
val (AppVersion(id, version), ahcs) = mapping
if (id == appId) ahcs.foreach { ahc => remove(id, version, ahc.healthCheck) }
for {
(version, activeHealthChecks) <- appHealthChecks(appId)
activeHealthCheck <- activeHealthChecks
} {
remove(appId, version, activeHealthCheck.healthCheck)
}
}

Expand All @@ -122,14 +135,12 @@ class MarathonHealthCheckManager @Inject() (

// remove health checks for which the app version is not current and no tasks remain
// since only current version tasks are launched.
appHealthChecks.foreach { mapping =>
val (AppVersion(id, version), ahcs) = mapping
if (id == app.id) {
val isCurrentVersion = version == app.version
lazy val hasTasks = versionTasksMap.contains(version.toString)
if (!isCurrentVersion && !hasTasks)
ahcs.foreach { ahc => remove(id, version, ahc.healthCheck) }
}
for {
(version, activeHealthChecks) <- appHealthChecks(appId)
if version != app.version && !versionTasksMap.contains(app.id.toString)
activeHealthCheck <- activeHealthChecks
} {
remove(appId, version, activeHealthCheck.healthCheck)
}

// add missing health checks for the current
Expand Down Expand Up @@ -197,25 +208,20 @@ class MarathonHealthCheckManager @Inject() (
}

override def statuses(appId: PathId): Future[Map[String, Seq[Health]]] = withReadLock {
val futureVersions = appRepository.listVersions(appId)
implicit val timeout: Timeout = Timeout(2, SECONDS)
val futureHealths = for {
ActiveHealthCheck(_, actor) <- appHealthChecks(appId).values.iterator.flatten.toVector
} yield (actor ? GetAppHealth).mapTo[AppHealth]

futureVersions flatMap { versions =>
val futureHealths = for {
version <- versions
ActiveHealthCheck(_, actor) <- listActive(appId, version)
} yield (actor ? GetAppHealth).mapTo[AppHealth]
Future.sequence(futureHealths) map { healths =>
val groupedHealth = healths.flatMap(_.health).groupBy(_.taskId)

Future.sequence(futureHealths) map { healths =>
val groupedHealth = healths.flatMap(_.health).groupBy(_.taskId)

taskTracker.get(appId).toSeq.map { task =>
groupedHealth.get(task.getId) match {
case Some(xs) => task.getId -> xs.toSeq
case None => task.getId -> Nil
}
}.toMap
}
taskTracker.get(appId).toSeq.map { task =>
groupedHealth.get(task.getId) match {
case Some(xs) => task.getId -> xs.toSeq
case None => task.getId -> Nil
}
}.toMap
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/mesosphere/marathon/state/Migration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import mesosphere.marathon.Protos.StorageVersion
import mesosphere.marathon.state.PathId._
import mesosphere.marathon.state.StorageVersions._
import mesosphere.marathon.tasks.TaskTracker
import mesosphere.marathon.tasks.TaskTracker.{InternalApp, App}
import mesosphere.marathon.tasks.TaskTracker.{ InternalApp, App }
import mesosphere.marathon.{ BuildInfo, MarathonConf, StorageException }
import mesosphere.util.BackToTheFuture.futureToFutureOption
import mesosphere.util.ThreadPoolContext.context
Expand Down

0 comments on commit 4279b6b

Please sign in to comment.