Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Fixes #4948 | Lazy event parsing
Browse files Browse the repository at this point in the history
Change SSE event stream handler to use events that are
lazy parsed to JSON. This will reduce CPU time when most events
are filtered and/or when more then one subscriper is connected.
  • Loading branch information
janisz committed Jan 30, 2017
1 parent 9592e53 commit de22344
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 27 deletions.
5 changes: 5 additions & 0 deletions src/main/scala/mesosphere/marathon/core/event/Events.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package mesosphere.marathon.core.event

import akka.event.EventStream
import com.fasterxml.jackson.annotation.JsonIgnore
import mesosphere.marathon.api.v2.json.Formats.eventToJson
import mesosphere.marathon.core.condition.Condition
import mesosphere.marathon.core.health.HealthCheck
import mesosphere.marathon.core.instance.update.InstanceChange
Expand All @@ -9,12 +11,15 @@ import mesosphere.marathon.core.instance.Instance
import mesosphere.marathon.state.{ AppDefinition, PathId, Timestamp }
import mesosphere.marathon.upgrade.{ DeploymentPlan, DeploymentStep }
import org.apache.mesos.{ Protos => Mesos }
import play.api.libs.json.Json

import scala.collection.immutable.Seq

sealed trait MarathonEvent {
val eventType: String
val timestamp: String
@JsonIgnore
lazy val jsonString: String = Json.stringify(eventToJson(this))
}

// api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mesosphere.marathon.core.event.impl.stream
import akka.actor._
import com.google.inject.Inject
import mesosphere.marathon.core.election.{ ElectionService, LocalLeadershipEvent }
import mesosphere.marathon.core.event.MarathonEvent
import mesosphere.marathon.core.event.impl.stream.HttpEventStreamActor._
import mesosphere.marathon.metrics.Metrics.AtomicIntGauge
import mesosphere.marathon.metrics.{ MetricPrefixes, Metrics }
Expand All @@ -16,7 +17,7 @@ import scala.util.Try
trait HttpEventStreamHandle {
def id: String
def remoteAddress: String
def sendEvent(event: String, message: String): Unit
def sendEvent(event: MarathonEvent): Unit
def close(): Unit
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import java.io.EOFException
import akka.actor.{ Actor, ActorLogging, Status }
import akka.event.EventStream
import akka.pattern.pipe
import mesosphere.marathon.api.v2.json.Formats._
import mesosphere.marathon.core.event.impl.stream.HttpEventStreamHandleActor._
import mesosphere.marathon.core.event.{ EventStreamAttached, EventStreamDetached, MarathonEvent }
import mesosphere.util.ThreadPoolContext
import play.api.libs.json.Json

import scala.concurrent.Future
import scala.util.Try
Expand Down Expand Up @@ -60,7 +58,7 @@ class HttpEventStreamHandleActor(
outstanding = List.empty[MarathonEvent]
context.become(stashEvents)
val sendFuture = Future {
toSend.foreach(event => handle.sendEvent(event.eventType, Json.stringify(eventToJson(event))))
toSend.foreach(event => handle.sendEvent(event))
WorkDone
}(ThreadPoolContext.ioContext)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import javax.servlet.http.{ Cookie, HttpServletRequest, HttpServletResponse }

import akka.actor.ActorRef
import mesosphere.marathon.api.RequestFacade
import mesosphere.marathon.core.event.EventConf
import mesosphere.marathon.core.event.{ EventConf, MarathonEvent }
import mesosphere.marathon.core.event.impl.stream.HttpEventStreamActor._
import mesosphere.marathon.plugin.auth._
import mesosphere.marathon.plugin.http.HttpResponse
Expand All @@ -24,7 +24,7 @@ class HttpEventSSEHandle(request: HttpServletRequest, emitter: Emitter) extends

lazy val id: String = UUID.randomUUID().toString

val subscribedEventTypes = request.getParameterMap.getOrDefault("event_type", Array.empty).toSet
private val subscribedEventTypes = request.getParameterMap.getOrDefault("event_type", Array.empty).toSet

def subscribed(eventType: String): Boolean = {
subscribedEventTypes.isEmpty || subscribedEventTypes.contains(eventType)
Expand All @@ -34,8 +34,8 @@ class HttpEventSSEHandle(request: HttpServletRequest, emitter: Emitter) extends

override def close(): Unit = emitter.close()

override def sendEvent(event: String, message: String): Unit = {
if (subscribed(event)) blocking(emitter.event(event, message))
override def sendEvent(event: MarathonEvent): Unit = {
if (subscribed(event.eventType)) blocking(emitter.event(event.eventType, event.jsonString))
}

override def toString: String = s"HttpEventSSEHandle($id on $remoteAddress on event types from $subscribedEventTypes)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,37 @@ import java.util.Collections
import javax.servlet.http.HttpServletRequest

import mesosphere.UnitTest
import mesosphere.marathon.core.event.{ Subscribe, Unsubscribe }
import mesosphere.marathon.stream._
import org.eclipse.jetty.servlets.EventSource.Emitter

class HttpEventSSEHandleTest extends UnitTest {
"HttpEventSSEHandle" should {
"events should be filtered" in {
Given("An emiter")
Given("An emitter")
val emitter = mock[Emitter]
Given("An request with params")
val req = mock[HttpServletRequest]
req.getParameterMap returns Map("event_type" -> Array("xyz"))
req.getParameterMap returns Map("event_type" -> Array(unsubscribe.eventType))

Given("handler for request is created")
val handle = new HttpEventSSEHandle(req, emitter)

When("Want to sent unwanted event")
handle.sendEvent("any event", "")
handle.sendEvent(subscribed)

Then("event should NOT be sent")
verify(emitter, never).event("any event", "")
verify(emitter, never).event(eq(subscribed.eventType), any[String])

When("Want to sent subscribed event")
handle.sendEvent("xyz", "")
handle.sendEvent(unsubscribe)

Then("event should be sent")
verify(emitter).event("xyz", "")
verify(emitter).event(eq(unsubscribe.eventType), any[String])
}

"events should NOT be filtered" in {
Given("An emiter")
Given("An emitter")
val emitter = mock[Emitter]

Given("An request without params")
Expand All @@ -45,16 +46,19 @@ class HttpEventSSEHandleTest extends UnitTest {
val handle = new HttpEventSSEHandle(req, emitter)

When("Want to sent event")
handle.sendEvent("any event", "")
Then("event should NOT be sent")
handle.sendEvent(subscribed)

verify(emitter).event("any event", "")
When("Want to sent event")
Then("event should be sent")
verify(emitter).event(eq(subscribed.eventType), any[String])

handle.sendEvent("xyz", "")
When("Want to sent event")
handle.sendEvent(unsubscribe)

Then("event should be sent")
verify(emitter).event("xyz", "")
verify(emitter).event(eq(unsubscribe.eventType), any[String])
}
}

val subscribed = Subscribe("client IP", "callback URL")
val unsubscribe = Unsubscribe("client IP", "callback URL")
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import akka.actor.Props
import akka.event.EventStream
import akka.testkit.{ EventFilter, ImplicitSender, TestActorRef }
import mesosphere.AkkaUnitTest
import mesosphere.marathon.core.event.{ EventStreamAttached, EventStreamDetached, Subscribe }
import mesosphere.marathon.core.event.{ EventStreamAttached, EventStreamDetached, MarathonEvent, Subscribe }

import scala.concurrent.duration._

Expand All @@ -24,20 +24,20 @@ class HttpEventStreamHandleActorTest extends AkkaUnitTest with ImplicitSender {
"A message send to the handle actor will be transferred to the stream handle" in new Fixture {
Given("A handler that will postpone sending until latch is hit")
val latch = new CountDownLatch(1)
handle.sendEvent(any[String], any[String]) answers (_ => latch.countDown())
handle.sendEvent(any[MarathonEvent]) answers (_ => latch.countDown())

When("The event is send to the actor, the outstanding messages is 1")
handleActor ! EventStreamAttached("remote")

Then("We need to wait for the future to succeed")
awaitCond(latch.getCount == 0)
verify(handle, times(1)).sendEvent(any[String], any[String])
verify(handle, times(1)).sendEvent(any[MarathonEvent])
}

"If the consumer is slow and maxOutstanding limit is reached, messages get dropped" in new Fixture {
Given("A handler that will postpone the sending")
val latch = new CountDownLatch(1)
handle.sendEvent(any[String], any[String]) answers (_ => latch.await())
handle.sendEvent(any[MarathonEvent]) answers (_ => latch.await())
val filter = EventFilter(pattern = "Ignore event.*", occurrences = 1)

When("More than the max size of outstanding events is send to the actor")
Expand All @@ -52,7 +52,7 @@ class HttpEventStreamHandleActorTest extends AkkaUnitTest with ImplicitSender {

"If the handler throws an EOF exception, the actor stops acting" in new Fixture {
Given("A handler that will postpone the sending")
handle.sendEvent(any[String], any[String]) answers { _ => throw new EOFException() }
handle.sendEvent(any[MarathonEvent]) answers { _ => throw new EOFException() }
val filter = EventFilter(pattern = "Received EOF.*", occurrences = 1)

When("An event is send to actor")
Expand All @@ -68,7 +68,7 @@ class HttpEventStreamHandleActorTest extends AkkaUnitTest with ImplicitSender {
var events = List.empty[String]
val handle = mock[HttpEventStreamHandle]
val stream = mock[EventStream]
handle.sendEvent(any[String], any[String]) answers { args => events ::= args(0).asInstanceOf[String]; latch.await() }
handle.sendEvent(any[MarathonEvent]) answers { args => events ::= args(0).asInstanceOf[MarathonEvent].eventType; latch.await() }
val handleActor: TestActorRef[HttpEventStreamHandleActor] = TestActorRef(Props(
new HttpEventStreamHandleActor(handle, stream, 50)
))
Expand Down

0 comments on commit de22344

Please sign in to comment.