Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Fixes #4948 | Lazy event parsing
Browse files Browse the repository at this point in the history
Change SSE event stream handler to use events that are
lazy parsed to JSON. This will reduce CPU time when most events
are filtered and/or when more then one subscriper is connected.
  • Loading branch information
janisz committed Jan 23, 2017
1 parent ad64d8b commit a4cb973
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 29 deletions.
15 changes: 14 additions & 1 deletion src/main/scala/mesosphere/marathon/core/event/Events.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package mesosphere.marathon.core.event
package mesosphere.marathon
package core.event

import akka.event.EventStream
import mesosphere.marathon.api.v2.json.Formats.eventToJson
import mesosphere.marathon.core.condition.Condition
import mesosphere.marathon.core.health.HealthCheck
import mesosphere.marathon.core.instance.update.InstanceChange
Expand All @@ -9,9 +11,20 @@ import mesosphere.marathon.core.instance.Instance
import mesosphere.marathon.state.{ AppDefinition, PathId, Timestamp }
import mesosphere.marathon.upgrade.{ DeploymentPlan, DeploymentStep }
import org.apache.mesos.{ Protos => Mesos }
import play.api.libs.json.Json

import scala.collection.immutable.Seq

trait Event {
val eventType: String
val message: String
}

case class JsonEvent(event: MarathonEvent) extends Event {
val eventType: String = event.eventType
lazy val message: String = Json.stringify(eventToJson(event))
}

sealed trait MarathonEvent {
val eventType: String
val timestamp: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mesosphere.marathon.core.event.impl.stream
import akka.actor._
import com.google.inject.Inject
import mesosphere.marathon.core.election.{ ElectionService, LocalLeadershipEvent }
import mesosphere.marathon.core.event.Event
import mesosphere.marathon.core.event.impl.stream.HttpEventStreamActor._
import mesosphere.marathon.metrics.Metrics.AtomicIntGauge
import mesosphere.marathon.metrics.{ MetricPrefixes, Metrics }
Expand All @@ -16,7 +17,7 @@ import scala.util.Try
trait HttpEventStreamHandle {
def id: String
def remoteAddress: String
def sendEvent(event: String, message: String): Unit
def sendEvent(event: Event): Unit
def close(): Unit
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package mesosphere.marathon.core.event.impl.stream
package mesosphere.marathon
package core.event.impl.stream

import java.io.EOFException

import akka.actor.{ Actor, ActorLogging, Status }
import akka.event.EventStream
import akka.pattern.pipe
import mesosphere.marathon.api.v2.json.Formats._
import mesosphere.marathon.core.event.impl.stream.HttpEventStreamHandleActor._
import mesosphere.marathon.core.event.{ EventStreamAttached, EventStreamDetached, MarathonEvent }
import mesosphere.marathon.core.event.{ EventStreamAttached, EventStreamDetached, MarathonEvent, JsonEvent }
import mesosphere.util.ThreadPoolContext
import play.api.libs.json.Json

import scala.concurrent.Future
import scala.util.Try
Expand All @@ -20,7 +19,7 @@ class HttpEventStreamHandleActor(
stream: EventStream,
maxOutStanding: Int) extends Actor with ActorLogging {

private[impl] var outstanding = Seq.empty[MarathonEvent]
private[impl] var outstanding = Seq.empty[JsonEvent]

override def preStart(): Unit = {
stream.subscribe(self, classOf[MarathonEvent])
Expand All @@ -38,13 +37,13 @@ class HttpEventStreamHandleActor(

def waitForEvent: Receive = {
case event: MarathonEvent =>
outstanding = event +: outstanding
outstanding = JsonEvent(event) +: outstanding
sendAllMessages()
}

def stashEvents: Receive = handleWorkDone orElse {
case event: MarathonEvent if outstanding.size >= maxOutStanding => dropEvent(event)
case event: MarathonEvent => outstanding = event +: outstanding
case event: MarathonEvent => outstanding = JsonEvent(event) +: outstanding
}

def handleWorkDone: Receive = {
Expand All @@ -57,10 +56,10 @@ class HttpEventStreamHandleActor(
private[this] def sendAllMessages(): Unit = {
if (outstanding.nonEmpty) {
val toSend = outstanding.reverse
outstanding = List.empty[MarathonEvent]
outstanding = Seq.empty[JsonEvent]
context.become(stashEvents)
val sendFuture = Future {
toSend.foreach(event => handle.sendEvent(event.eventType, Json.stringify(eventToJson(event))))
toSend.foreach(event => handle.sendEvent(event))
WorkDone
}(ThreadPoolContext.ioContext)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import javax.servlet.http.{ Cookie, HttpServletRequest, HttpServletResponse }

import akka.actor.ActorRef
import mesosphere.marathon.api.RequestFacade
import mesosphere.marathon.core.event.EventConf
import mesosphere.marathon.core.event.{ EventConf, Event }
import mesosphere.marathon.core.event.impl.stream.HttpEventStreamActor._
import mesosphere.marathon.plugin.auth._
import mesosphere.marathon.plugin.http.HttpResponse
Expand All @@ -24,7 +24,7 @@ class HttpEventSSEHandle(request: HttpServletRequest, emitter: Emitter) extends

lazy val id: String = UUID.randomUUID().toString

val subscribedEventTypes = request.getParameterMap.getOrDefault("event_type", Array.empty).toSet
private val subscribedEventTypes = request.getParameterMap.getOrDefault("event_type", Array.empty).toSet

def subscribed(eventType: String): Boolean = {
subscribedEventTypes.isEmpty || subscribedEventTypes.contains(eventType)
Expand All @@ -34,8 +34,8 @@ class HttpEventSSEHandle(request: HttpServletRequest, emitter: Emitter) extends

override def close(): Unit = emitter.close()

override def sendEvent(event: String, message: String): Unit = {
if (subscribed(event)) blocking(emitter.event(event, message))
override def sendEvent(event: Event): Unit = {
if (subscribed(event.eventType)) blocking(emitter.event(event.eventType, event.message))
}

override def toString: String = s"HttpEventSSEHandle($id on $remoteAddress on event types from $subscribedEventTypes)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ package core.event.impl.stream
import java.util.Collections
import javax.servlet.http.HttpServletRequest

import mesosphere.marathon.core.event.Event
import mesosphere.marathon.stream._
import mesosphere.marathon.test.{ MarathonSpec, Mockito }
import org.eclipse.jetty.servlets.EventSource.Emitter
import org.scalatest.{ GivenWhenThen, Matchers }
import mesosphere.marathon.stream._

class HttpEventSSEHandleTest extends MarathonSpec with Matchers with Mockito with GivenWhenThen {

Expand All @@ -22,13 +23,13 @@ class HttpEventSSEHandleTest extends MarathonSpec with Matchers with Mockito wit
val handle = new HttpEventSSEHandle(req, emitter)

When("Want to sent unwanted event")
handle.sendEvent("any event", "")
handle.sendEvent(TestEvent("any event", ""))

Then("event should NOT be sent")
verify(emitter, never).event("any event", "")

When("Want to sent subscribed event")
handle.sendEvent("xyz", "")
handle.sendEvent(TestEvent("xyz", ""))

Then("event should be sent")
verify(emitter).event("xyz", "")
Expand All @@ -46,16 +47,18 @@ class HttpEventSSEHandleTest extends MarathonSpec with Matchers with Mockito wit
val handle = new HttpEventSSEHandle(req, emitter)

When("Want to sent event")
handle.sendEvent("any event", "")
Then("event should NOT be sent")
handle.sendEvent(TestEvent("any event", ""))

Then("event should be sent")
verify(emitter).event("any event", "")
When("Want to sent event")

handle.sendEvent("xyz", "")
When("Want to sent event")
handle.sendEvent(TestEvent("xyz", ""))

Then("event should be sent")
verify(emitter).event("xyz", "")
}

case class TestEvent(eventType: String, message: String) extends Event
}

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.util.concurrent.CountDownLatch
import akka.actor.Props
import akka.event.EventStream
import akka.testkit.{ EventFilter, ImplicitSender, TestActorRef }
import mesosphere.marathon.core.event.{ EventStreamAttached, EventStreamDetached, Subscribe }
import mesosphere.marathon.core.event._
import mesosphere.marathon.test.{ MarathonActorSupport, MarathonSpec, Mockito }
import org.scalatest.{ BeforeAndAfter, GivenWhenThen, Matchers }

Expand All @@ -18,20 +18,20 @@ class HttpEventStreamHandleActorTest extends MarathonActorSupport
test("A message send to the handle actor will be transferred to the stream handle") {
Given("A handler that will postpone sending until latch is hit")
val latch = new CountDownLatch(1)
handle.sendEvent(any[String], any[String]) answers (_ => latch.countDown())
handle.sendEvent(any[Event]) answers (_ => latch.countDown())

When("The event is send to the actor, the outstanding messages is 1")
handleActor ! EventStreamAttached("remote")

Then("We need to wait for the future to succeed")
awaitCond(latch.getCount == 0)
verify(handle, times(1)).sendEvent(any[String], any[String])
verify(handle, times(1)).sendEvent(any[Event])
}

test("If the consumer is slow and maxOutstanding limit is reached, messages get dropped") {
Given("A handler that will postpone the sending")
val latch = new CountDownLatch(1)
handle.sendEvent(any[String], any[String]) answers (_ => latch.await())
handle.sendEvent(any[Event]) answers (_ => latch.await())
val filter = EventFilter(pattern = "Ignore event.*", occurrences = 1)

When("More than the max size of outstanding events is send to the actor")
Expand All @@ -46,7 +46,7 @@ class HttpEventStreamHandleActorTest extends MarathonActorSupport

test("If the handler throws an EOF exception, the actor stops acting") {
Given("A handler that will postpone the sending")
handle.sendEvent(any[String], any[String]) answers { _ => throw new EOFException() }
handle.sendEvent(any[Event]) answers { _ => throw new EOFException() }
val filter = EventFilter(pattern = "Received EOF.*", occurrences = 1)

When("An event is send to actor")
Expand All @@ -60,7 +60,7 @@ class HttpEventStreamHandleActorTest extends MarathonActorSupport
Given("A handler that will postpone the sending")
val latch = new CountDownLatch(1)
var events = List.empty[String]
handle.sendEvent(any[String], any[String]) answers { args => events ::= args(0).asInstanceOf[String]; latch.await() }
handle.sendEvent(any[Event]) answers { args => events ::= args(0).asInstanceOf[Event].eventType; latch.await() }
handleActor = TestActorRef(Props(
new HttpEventStreamHandleActor(handle, stream, 50)
))
Expand All @@ -74,7 +74,7 @@ class HttpEventStreamHandleActorTest extends MarathonActorSupport
handleActor ! subscribe

Then("The actor stores the events in reverse order")
handleActor.underlyingActor.outstanding should be (subscribe :: detached :: Nil)
handleActor.underlyingActor.outstanding should be (JsonEvent(subscribe) :: JsonEvent(detached) :: Nil)

When("The first event is delivered")
latch.countDown()
Expand Down

0 comments on commit a4cb973

Please sign in to comment.