Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Make sure the HistoryActor only stores task failures once.
Browse files Browse the repository at this point in the history
Summary:
With introducing the instance abstraction, the HistoryActor started to handle both InstanceChanged events and MesosStatusUpdateEvents for storing TaskFailures. This leads to failures being stored twice if both events are translated to a TaskFailure (the second write overwrites the first because we only keep the latest TaskFailure). Mid-term, handling the MesosStatusUpdateEvent should be removed in favor of InstanceChanged, but the current implementation re provided data is not sufficient to do so and requires more changes. This patch prevents the actor from storing the failure twice by going back to handling MesosStatusUpdateEvent onl< for the time being.

Related to #4792

Test Plan: sbt test

Reviewers: timcharper, aquamatthias, jasongilanfarr

Reviewed By: aquamatthias, jasongilanfarr

Subscribers: jenkins, marathon-team

Differential Revision: https://phabricator.mesosphere.com/D411
  • Loading branch information
meichstedt committed Jan 12, 2017
1 parent 40d19f4 commit bd1c69a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ class HistoryActor(eventBus: EventStream, taskFailureRepository: TaskFailureRepo
extends Actor {

override def preStart(): Unit = {

// TODO(PODS): remove InstanceChanged (MesosStatusUpdate should have this information)
eventBus.subscribe(self, classOf[InstanceChanged])
// TODO(cleanup): adjust InstanceChanged to be able to replace using MesosStatusUpdateEvent here (#4792)
eventBus.subscribe(self, classOf[MesosStatusUpdateEvent])
eventBus.subscribe(self, classOf[UnhealthyInstanceKillEvent])
eventBus.subscribe(self, classOf[AppTerminatedEvent])
Expand All @@ -27,9 +25,6 @@ class HistoryActor(eventBus: EventStream, taskFailureRepository: TaskFailureRepo
case TaskFailure.FromMesosStatusUpdateEvent(taskFailure) =>
taskFailureRepository.store(taskFailure)

case TaskFailure.FromInstanceChangedEvent(taskFailure) =>
taskFailureRepository.store(taskFailure)

case _: MesosStatusUpdateEvent => // ignore non-failure status updates

case AppTerminatedEvent(appId, eventType, timestamp) =>
Expand Down
23 changes: 12 additions & 11 deletions src/test/scala/mesosphere/marathon/MarathonSchedulerActorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ import akka.testkit._
import akka.util.Timeout
import mesosphere.Unstable
import mesosphere.marathon.MarathonSchedulerActor._
import mesosphere.marathon.core.condition.Condition
import mesosphere.marathon.core.election.{ ElectionService, LocalLeadershipEvent }
import mesosphere.marathon.core.event._
import mesosphere.marathon.core.health.HealthCheckManager
import mesosphere.marathon.core.history.impl.HistoryActor
import mesosphere.marathon.core.instance.update.InstanceChangedEventsGenerator
import mesosphere.marathon.core.instance.{ Instance, TestInstanceBuilder }
import mesosphere.marathon.core.launcher.impl.LaunchQueueTestHelper
import mesosphere.marathon.core.launchqueue.LaunchQueue
import mesosphere.marathon.core.readiness.ReadinessCheckExecutor
import mesosphere.marathon.core.task.KillServiceMock
import mesosphere.marathon.core.task.bus.TaskStatusUpdateTestHelper
import mesosphere.marathon.core.task.tracker.InstanceTracker
import mesosphere.marathon.io.storage.StorageProvider
import mesosphere.marathon.state.PathId._
Expand Down Expand Up @@ -246,16 +247,11 @@ class MarathonSchedulerActorTest extends MarathonActorSupport
import f._
val app = AppDefinition(id = "/test-app".toPath, instances = 1)
val instance = TestInstanceBuilder.newBuilder(app.id).addTaskStaged().getInstance()
// TODO(PODS): add proper way to create correct InstanceChanged event
val instanceChangedEvent = InstanceChanged(
instance.instanceId,
instance.runSpecVersion,
instance.runSpecId,
Condition.Failed,
instance
)
val failedInstance = TaskStatusUpdateTestHelper.failed(instance).updatedInstance
val events = InstanceChangedEventsGenerator.events(
failedInstance, task = Some(failedInstance.appTask), now = Timestamp.now(), previousCondition = Some(instance.state.condition))

f.killService.customStatusUpdates.put(instance.instanceId, Seq(instanceChangedEvent))
f.killService.customStatusUpdates.put(instance.instanceId, events)

queue.get(app.id) returns Some(LaunchQueueTestHelper.zeroCounts)
groupRepo.root() returns Future.successful(createRootGroup(apps = Map(app.id -> app)))
Expand All @@ -270,7 +266,12 @@ class MarathonSchedulerActorTest extends MarathonActorSupport

expectMsg(5.seconds, TasksKilled(app.id, Seq(instance.instanceId)))

val Some(taskFailureEvent) = TaskFailure.FromInstanceChangedEvent(instanceChangedEvent)
val mesosStatusUpdateEvent: MesosStatusUpdateEvent = events.collectFirst {
case event: MesosStatusUpdateEvent => event
}.getOrElse {
fail(s"$events did not contain a MesosStatusUpdateEvent")
}
val Some(taskFailureEvent) = TaskFailure.FromMesosStatusUpdateEvent(mesosStatusUpdateEvent)
awaitAssert(verify(taskFailureEventRepository, times(1)).store(taskFailureEvent), 5.seconds, 10.millis)

// KillTasks does no longer scale
Expand Down

0 comments on commit bd1c69a

Please sign in to comment.