Skip to content

Commit

Permalink
Merge branch 'master' into bugfix/d2iq-archive#4978/validate_container
Browse files Browse the repository at this point in the history
  • Loading branch information
unterstein authored Jan 31, 2017
2 parents 5c44592 + 3babb4f commit ab7e276
Show file tree
Hide file tree
Showing 33 changed files with 1,432 additions and 1,222 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ lazy val commonSettings = inConfig(SerialIntegrationTest)(Defaults.testTasks) ++
"-l", "mesosphere.marathon.UnstableTest")),
parallelExecution in IntegrationTest := true,
testForkedParallel in IntegrationTest := true,
concurrentRestrictions in IntegrationTest := Seq(Tags.limitAll(java.lang.Runtime.getRuntime.availableProcessors() / 2)),
test in IntegrationTest := {
(test in IntegrationTest).value
(test in SerialIntegrationTest).value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class CoreModuleImpl @Inject() (

private[this] lazy val offerMatcherManagerModule = new OfferMatcherManagerModule(
// infrastructure
clock, random, metrics, marathonConf,
clock, random, metrics, marathonConf, actorSystem.scheduler,
leadershipModule
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package mesosphere.marathon
package core.launcher.impl

import akka.Done
import akka.pattern.AskTimeoutException
import com.typesafe.scalalogging.StrictLogging
import mesosphere.marathon.core.base.Clock
import mesosphere.marathon.core.instance.update.InstanceUpdateOperation
Expand Down Expand Up @@ -49,23 +48,20 @@ private[launcher] class OfferProcessorImpl(
logger.debug(s"Received offer\n${offer}")
incomingOffersMeter.mark()

val matchingDeadline = clock.now() + offerMatchingTimeout
val now = clock.now()
val matchingDeadline = now + offerMatchingTimeout
val savingDeadline = matchingDeadline + saveTasksToLaunchTimeout

val matchFuture: Future[MatchedInstanceOps] = matchTimeMeter.timeFuture {
offerMatcher.matchOffer(matchingDeadline, offer)
offerMatcher.matchOffer(now, matchingDeadline, offer)
}

matchFuture
.recover {
case e: AskTimeoutException =>
matchErrorsMeter.mark()
logger.warn(s"Could not process offer '${offer.getId.getValue}' in time. (See --offer_matching_timeout)")
MatchedInstanceOps(offer.getId, resendThisOffer = true)
case NonFatal(e) =>
matchErrorsMeter.mark()
logger.error(s"Could not process offer '${offer.getId.getValue}'", e)
MatchedInstanceOps(offer.getId, resendThisOffer = true)
MatchedInstanceOps.noMatch(offer.getId, resendThisOffer = true)
}.flatMap {
case MatchedInstanceOps(offerId, tasks, resendThisOffer) =>
savingTasksTimeMeter.timeFuture {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import mesosphere.marathon.state.{ RunSpec, Timestamp }
import org.apache.mesos.{ Protos => Mesos }
import mesosphere.marathon.stream._

import scala.concurrent.Promise
import scala.concurrent.duration._

private[launchqueue] object TaskLauncherActor {
Expand Down Expand Up @@ -353,25 +354,33 @@ private class TaskLauncherActor(
}

private[this] def receiveProcessOffers: Receive = {
case ActorOfferMatcher.MatchOffer(deadline, offer) if clock.now() >= deadline || !shouldLaunchInstances =>
case ActorOfferMatcher.MatchOffer(deadline, offer, promise) if clock.now() >= deadline || !shouldLaunchInstances =>
val deadlineReached = clock.now() >= deadline
log.debug("ignoring offer, offer deadline {}reached. {}", if (deadlineReached) "" else "NOT ", status)
sender ! MatchedInstanceOps(offer.getId)
promise.trySuccess(MatchedInstanceOps.noMatch(offer.getId))

case ActorOfferMatcher.MatchOffer(deadline, offer) =>
case ActorOfferMatcher.MatchOffer(deadline, offer, promise) =>
val reachableInstances = instanceMap.filterNotAs{ case (_, instance) => instance.state.condition.isLost }
val matchRequest = InstanceOpFactory.Request(runSpec, offer, reachableInstances, instancesToLaunch)
instanceOpFactory.matchOfferRequest(matchRequest) match {
case matched: OfferMatchResult.Match =>
offerMatchStatisticsActor ! matched
handleInstanceOp(matched.instanceOp, offer)
handleInstanceOp(matched.instanceOp, offer, promise)
case notMatched: OfferMatchResult.NoMatch =>
offerMatchStatisticsActor ! notMatched
sender() ! MatchedInstanceOps(offer.getId)
promise.trySuccess(MatchedInstanceOps.noMatch(offer.getId))
}
}

private[this] def handleInstanceOp(instanceOp: InstanceOp, offer: Mesos.Offer): Unit = {
/**
* Mutate internal state in response to having matched an instanceOp.
*
* @param instanceOp The instanceOp that is to be applied to on a previously
* received offer
* @param offer The offer that could be matched successfully.
* @param promise Promise that tells offer matcher that the offer has been accepted.
*/
private[this] def handleInstanceOp(instanceOp: InstanceOp, offer: Mesos.Offer, promise: Promise[MatchedInstanceOps]): Unit = {
def updateActorState(): Unit = {
val instanceId = instanceOp.instanceId
instanceOp match {
Expand Down Expand Up @@ -413,7 +422,7 @@ private class TaskLauncherActor(
"Request {} for instance '{}', version '{}'. {}",
instanceOp.getClass.getSimpleName, instanceOp.instanceId.idString, runSpec.version, status)

sender() ! MatchedInstanceOps(offer.getId, Seq(InstanceOpWithSource(myselfAsLaunchSource, instanceOp)))
promise.trySuccess(MatchedInstanceOps(offer.getId, Seq(InstanceOpWithSource(myselfAsLaunchSource, instanceOp))))
}

private[this] def scheduleTaskOpTimeout(instanceOp: InstanceOp): Unit = {
Expand Down Expand Up @@ -455,7 +464,7 @@ private class TaskLauncherActor(
private[this] object OfferMatcherRegistration {
private[this] val myselfAsOfferMatcher: OfferMatcher = {
//set the precedence only, if this app is resident
new ActorOfferMatcher(clock, self, runSpec.residency.map(_ => runSpec.id))
new ActorOfferMatcher(self, runSpec.residency.map(_ => runSpec.id))(context.system.scheduler)
}
private[this] var registeredAsMatcher = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object OfferMatcher {
*/
case class MatchedInstanceOps(
offerId: Mesos.OfferID,
opsWithSource: Seq[InstanceOpWithSource] = Seq.empty,
opsWithSource: Seq[InstanceOpWithSource],
resendThisOffer: Boolean = false) {

/** all included [InstanceOp] without the source information. */
Expand All @@ -51,7 +51,7 @@ object OfferMatcher {

object MatchedInstanceOps {
def noMatch(offerId: Mesos.OfferID, resendThisOffer: Boolean = false): MatchedInstanceOps =
new MatchedInstanceOps(offerId, resendThisOffer = resendThisOffer)
new MatchedInstanceOps(offerId, Seq.empty[InstanceOpWithSource], resendThisOffer = resendThisOffer)
}

trait InstanceOpSource {
Expand All @@ -70,9 +70,13 @@ trait OfferMatcher {
* The offer matcher can expect either a instanceOpAccepted or a instanceOpRejected call
* for every returned `org.apache.mesos.Protos.TaskInfo`.
*
* If the matching cannot be processed within the deadline, an empty
* MatchedInstanceOps will be returned
*
*
* TODO(jdef) PODS ... 1:1 ratio between TaskInfo and instanceOpXXX may change?
*/
def matchOffer(deadline: Timestamp, offer: Mesos.Offer): Future[OfferMatcher.MatchedInstanceOps]
def matchOffer(now: Timestamp, deadline: Timestamp, offer: Mesos.Offer): Future[OfferMatcher.MatchedInstanceOps]

/**
* We can optimize the offer routing for different offer matcher in case there are reserved resources.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,45 @@
package mesosphere.marathon.core.matcher.base.util
package mesosphere.marathon
package core.matcher.base.util

import akka.actor.ActorRef
import akka.pattern.{ AskTimeoutException, ask }
import akka.util.Timeout
import mesosphere.marathon.core.base.Clock
import com.typesafe.scalalogging.StrictLogging
import mesosphere.marathon.core.matcher.base.OfferMatcher
import mesosphere.marathon.core.matcher.base.OfferMatcher.MatchedInstanceOps
import mesosphere.marathon.state.{ PathId, Timestamp }
import mesosphere.util._
import mesosphere.marathon.util.{ Timeout, TimeoutException }
import org.apache.mesos.Protos.Offer

import scala.concurrent.Future
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration._

/**
* Provides a thin wrapper around an OfferMatcher implemented as an actors.
*
* @param actorRef Reference to actor that matches offers.
* @param precedenceFor Defines which matcher receives offers first. See [[mesosphere.marathon.core.matcher.base.OfferMatcher.precedenceFor]].
*/
class ActorOfferMatcher(
clock: Clock,
actorRef: ActorRef,
override val precedenceFor: Option[PathId]) extends OfferMatcher {
def matchOffer(deadline: Timestamp, offer: Offer): Future[MatchedInstanceOps] = {
import mesosphere.util.CallerThreadExecutionContext.callerThreadExecutionContext
implicit val timeout: Timeout = clock.now().until(deadline)
if (timeout.duration > ActorOfferMatcher.MinimalOfferComputationTime) {
val answerFuture = actorRef ? ActorOfferMatcher.MatchOffer(deadline, offer)
answerFuture.mapTo[MatchedInstanceOps].recover {
case _: AskTimeoutException => MatchedInstanceOps(offer.getId)
}
} else {
class ActorOfferMatcher(actorRef: ActorRef, override val precedenceFor: Option[PathId])(implicit scheduler: akka.actor.Scheduler)
extends OfferMatcher with StrictLogging {

def matchOffer(now: Timestamp, deadline: Timestamp, offer: Offer): Future[MatchedInstanceOps] = {
import scala.concurrent.ExecutionContext.Implicits.global

val timeout: FiniteDuration = now.until(deadline)

if (timeout <= ActorOfferMatcher.MinimalOfferComputationTime) {
// if deadline is exceeded return no match
Future.successful(MatchedInstanceOps(offer.getId))
Future.successful(MatchedInstanceOps.noMatch(offer.getId))
} else {

val p = Promise[MatchedInstanceOps]()
actorRef ! ActorOfferMatcher.MatchOffer(deadline, offer, p)

Timeout(timeout)(p.future).recover {
case e: TimeoutException =>
logger.warn(s"Could not process offer '${offer.getId.getValue}' within ${timeout.toHumanReadable}. (See --offer_matching_timeout)")
MatchedInstanceOps.noMatch(offer.getId)
}
}
}

Expand All @@ -47,6 +57,10 @@ object ActorOfferMatcher {
*
* This should always be replied to with a LaunchTasks message.
* TODO(jdef) pods will probably require a non-LaunchTasks message
*
* @param matchingDeadline Don't match after deadline.
* @param remainingOffer Part of the offer that has not been matched.
* @param promise The promise to fullfil with match.
*/
case class MatchOffer(matchingDeadline: Timestamp, remainingOffer: Offer)
case class MatchOffer(matchingDeadline: Timestamp, remainingOffer: Offer, promise: Promise[MatchedInstanceOps])
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import scala.concurrent.{ ExecutionContext, Future }
* Wraps multiple offer matchers and returns the first non-empty match or (if all are empty) the last empty match.
*/
class StopOnFirstMatchingOfferMatcher(chained: OfferMatcher*) extends OfferMatcher {
override def matchOffer(deadline: Timestamp, offer: Offer): Future[MatchedInstanceOps] = {
override def matchOffer(now: Timestamp, deadline: Timestamp, offer: Offer): Future[MatchedInstanceOps] = {
chained.foldLeft(Future.successful(MatchedInstanceOps.noMatch(offer.getId, resendThisOffer = false))) {
case (matchedFuture, nextMatcher) =>
matchedFuture.flatMap { matched =>
if (matched.ops.isEmpty) nextMatcher.matchOffer(deadline, offer)
if (matched.ops.isEmpty) nextMatcher.matchOffer(now, deadline, offer)
else matchedFuture
}(ExecutionContext.global)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package mesosphere.marathon.core.matcher.manager
package mesosphere.marathon
package core.matcher.manager

import akka.actor.ActorRef
import akka.actor.{ ActorRef, Scheduler }
import mesosphere.marathon.core.base.Clock
import mesosphere.marathon.core.leadership.LeadershipModule
import mesosphere.marathon.core.matcher.base.OfferMatcher
import mesosphere.marathon.core.matcher.base.util.ActorOfferMatcher
import mesosphere.marathon.core.matcher.manager.impl.{
OfferMatcherManagerActor,
OfferMatcherManagerActorMetrics,
OfferMatcherManagerDelegate
}
import mesosphere.marathon.core.matcher.manager.impl.{ OfferMatcherManagerActor, OfferMatcherManagerActorMetrics, OfferMatcherManagerDelegate }
import mesosphere.marathon.metrics.Metrics
import rx.lang.scala.subjects.BehaviorSubject
import rx.lang.scala.{ Observable, Subject }
Expand All @@ -23,7 +20,9 @@ import scala.util.Random
class OfferMatcherManagerModule(
clock: Clock, random: Random, metrics: Metrics,
offerMatcherConfig: OfferMatcherManagerConfig,
leadershipModule: LeadershipModule) {
scheduler: Scheduler,
leadershipModule: LeadershipModule,
actorName: String = "offerMatcherManager") {

private[this] lazy val offersWanted: Subject[Boolean] = BehaviorSubject[Boolean](false)

Expand All @@ -32,14 +31,14 @@ class OfferMatcherManagerModule(
private[this] val offerMatcherMultiplexer: ActorRef = {
val props = OfferMatcherManagerActor.props(
offerMatcherManagerMetrics, random, clock, offerMatcherConfig, offersWanted)
leadershipModule.startWhenLeader(props, "offerMatcherManager")
leadershipModule.startWhenLeader(props, actorName)
}

/**
* Signals `true` if we are interested in (new) offers, signals `false` if we are currently not interested in
* offers.
*/
val globalOfferMatcherWantsOffers: Observable[Boolean] = offersWanted
val globalOfferMatcher: OfferMatcher = new ActorOfferMatcher(clock, offerMatcherMultiplexer, None)
val globalOfferMatcher: OfferMatcher = new ActorOfferMatcher(offerMatcherMultiplexer, None)(scheduler)
val subOfferMatcherManager: OfferMatcherManager = new OfferMatcherManagerDelegate(offerMatcherMultiplexer)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package mesosphere.marathon
package core.matcher.manager.impl

import akka.actor.{ Actor, ActorLogging, ActorRef, Props }
import akka.actor.{ Actor, ActorLogging, Props }
import akka.event.LoggingReceive
import akka.pattern.pipe
import mesosphere.marathon.core.base.Clock
Expand All @@ -20,6 +20,7 @@ import org.apache.mesos.Protos.{ Offer, OfferID }
import rx.lang.scala.Observer

import scala.collection.immutable.Queue
import scala.concurrent.Promise
import scala.util.Random
import scala.util.control.NonFatal

Expand All @@ -42,10 +43,25 @@ private[manager] object OfferMatcherManagerActor {
Props(new OfferMatcherManagerActor(metrics, random, clock, offerMatcherConfig, offersWanted))
}

/**
*
* @constructor Create a new instance that bundles offer, deadline and ops.
* @param offer The offer that is matched.
* @param deadline If an offer is not processed until the deadline the promise
* is succeeded without a match.
* @param promise The promise will receive the matched instance ops if a match
* if found. The promise might be fulfilled by the sender, e.g. [[mesosphere.marathon.core.matcher.base.util.ActorOfferMatcher]]
* if the deadline is reached before the offer has been processed.
* @param matcherQueue The offer matchers which should be applied to the
* offer.
* @param ops ???
* @param matchPasses ???
* @param resendThisOffer ???
*/
private case class OfferData(
offer: Offer,
deadline: Timestamp,
sender: ActorRef,
promise: Promise[OfferMatcher.MatchedInstanceOps],
matcherQueue: Queue[OfferMatcher],
ops: Seq[InstanceOpWithSource] = Seq.empty,
matchPasses: Int = 0,
Expand Down Expand Up @@ -144,16 +160,16 @@ private[impl] class OfferMatcherManagerActor private (
}

private[this] def receiveProcessOffer: Receive = {
case ActorOfferMatcher.MatchOffer(deadline, offer: Offer) if !offersWanted =>
case ActorOfferMatcher.MatchOffer(deadline, offer: Offer, promise: Promise[OfferMatcher.MatchedInstanceOps]) if !offersWanted =>
log.debug(s"Ignoring offer ${offer.getId.getValue}: No one interested.")
sender() ! OfferMatcher.MatchedInstanceOps(offer.getId, resendThisOffer = false)
promise.trySuccess(OfferMatcher.MatchedInstanceOps.noMatch(offer.getId, resendThisOffer = false))

case ActorOfferMatcher.MatchOffer(deadline, offer: Offer) =>
case ActorOfferMatcher.MatchOffer(deadline, offer: Offer, promise: Promise[OfferMatcher.MatchedInstanceOps]) =>
log.debug(s"Start processing offer ${offer.getId.getValue}")

// setup initial offer data
val randomizedMatchers = offerMatchers(offer)
val data = OfferMatcherManagerActor.OfferData(offer, deadline, sender(), randomizedMatchers)
val data = OfferMatcherManagerActor.OfferData(offer, deadline, promise, randomizedMatchers)
offerQueues += offer.getId -> data
metrics.currentOffersGauge.setValue(offerQueues.size)

Expand Down Expand Up @@ -213,7 +229,7 @@ private[impl] class OfferMatcherManagerActor private (
case MatchTimeout(offerId) =>
// When the timeout is reached, we will answer with all matching instances we found until then.
// Since we cannot be sure if we found all matching instances, we set resendThisOffer to true.
offerQueues.get(offerId).foreach(sendMatchResult(_, resendThisOffer = true))
offerQueues.get(offerId).foreach(completeWithMatchResult(_, resendThisOffer = true))
}

private[this] def scheduleNextMatcherOrFinish(data: OfferData): Unit = {
Expand All @@ -239,18 +255,18 @@ private[impl] class OfferMatcherManagerActor private (
import context.dispatcher
log.debug(s"query next offer matcher $nextMatcher for offer id ${data.offer.getId.getValue}")
nextMatcher
.matchOffer(newData.deadline, newData.offer)
.matchOffer(clock.now(), newData.deadline, newData.offer)
.recover {
case NonFatal(e) =>
log.warning("Received error from {}", e)
MatchedInstanceOps(data.offer.getId, resendThisOffer = true)
MatchedInstanceOps.noMatch(data.offer.getId, resendThisOffer = true)
}.pipeTo(self)
case None => sendMatchResult(data, data.resendThisOffer)
case None => completeWithMatchResult(data, data.resendThisOffer)
}
}

private[this] def sendMatchResult(data: OfferData, resendThisOffer: Boolean): Unit = {
data.sender ! OfferMatcher.MatchedInstanceOps(data.offer.getId, data.ops, resendThisOffer)
private[this] def completeWithMatchResult(data: OfferData, resendThisOffer: Boolean): Unit = {
data.promise.trySuccess(OfferMatcher.MatchedInstanceOps(data.offer.getId, data.ops, resendThisOffer))
offerQueues -= data.offer.getId
metrics.currentOffersGauge.setValue(offerQueues.size)
val maxRanges = if (log.isDebugEnabled) 1000 else 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private[reconcile] class OfferMatcherReconciler(instanceTracker: InstanceTracker

import scala.concurrent.ExecutionContext.Implicits.global

override def matchOffer(deadline: Timestamp, offer: Offer): Future[MatchedInstanceOps] = {
override def matchOffer(now: Timestamp, deadline: Timestamp, offer: Offer): Future[MatchedInstanceOps] = {

val frameworkId = FrameworkId("").mergeFromProto(offer.getFrameworkId)

Expand Down
Loading

0 comments on commit ab7e276

Please sign in to comment.