From de2234462e3d4131e63e70b7f58d0bf9b5909ad6 Mon Sep 17 00:00:00 2001 From: Tomasz Janiszewski Date: Wed, 18 Jan 2017 22:22:29 +0100 Subject: [PATCH] Fixes #4948 | Lazy event parsing Change SSE event stream handler to use events that are lazy parsed to JSON. This will reduce CPU time when most events are filtered and/or when more then one subscriper is connected. --- .../marathon/core/event/Events.scala | 5 ++++ .../impl/stream/HttpEventStreamActor.scala | 3 +- .../stream/HttpEventStreamHandleActor.scala | 4 +-- .../impl/stream/HttpEventStreamServlet.scala | 8 ++--- .../impl/stream/HttpEventSSEHandleTest.scala | 30 +++++++++++-------- .../HttpEventStreamHandleActorTest.scala | 12 ++++---- 6 files changed, 35 insertions(+), 27 deletions(-) diff --git a/src/main/scala/mesosphere/marathon/core/event/Events.scala b/src/main/scala/mesosphere/marathon/core/event/Events.scala index e9995a77143..d9fd024b2be 100644 --- a/src/main/scala/mesosphere/marathon/core/event/Events.scala +++ b/src/main/scala/mesosphere/marathon/core/event/Events.scala @@ -1,6 +1,8 @@ package mesosphere.marathon.core.event import akka.event.EventStream +import com.fasterxml.jackson.annotation.JsonIgnore +import mesosphere.marathon.api.v2.json.Formats.eventToJson import mesosphere.marathon.core.condition.Condition import mesosphere.marathon.core.health.HealthCheck import mesosphere.marathon.core.instance.update.InstanceChange @@ -9,12 +11,15 @@ import mesosphere.marathon.core.instance.Instance import mesosphere.marathon.state.{ AppDefinition, PathId, Timestamp } import mesosphere.marathon.upgrade.{ DeploymentPlan, DeploymentStep } import org.apache.mesos.{ Protos => Mesos } +import play.api.libs.json.Json import scala.collection.immutable.Seq sealed trait MarathonEvent { val eventType: String val timestamp: String + @JsonIgnore + lazy val jsonString: String = Json.stringify(eventToJson(this)) } // api diff --git a/src/main/scala/mesosphere/marathon/core/event/impl/stream/HttpEventStreamActor.scala b/src/main/scala/mesosphere/marathon/core/event/impl/stream/HttpEventStreamActor.scala index a55f166afe6..b80fdf50d06 100644 --- a/src/main/scala/mesosphere/marathon/core/event/impl/stream/HttpEventStreamActor.scala +++ b/src/main/scala/mesosphere/marathon/core/event/impl/stream/HttpEventStreamActor.scala @@ -3,6 +3,7 @@ package mesosphere.marathon.core.event.impl.stream import akka.actor._ import com.google.inject.Inject import mesosphere.marathon.core.election.{ ElectionService, LocalLeadershipEvent } +import mesosphere.marathon.core.event.MarathonEvent import mesosphere.marathon.core.event.impl.stream.HttpEventStreamActor._ import mesosphere.marathon.metrics.Metrics.AtomicIntGauge import mesosphere.marathon.metrics.{ MetricPrefixes, Metrics } @@ -16,7 +17,7 @@ import scala.util.Try trait HttpEventStreamHandle { def id: String def remoteAddress: String - def sendEvent(event: String, message: String): Unit + def sendEvent(event: MarathonEvent): Unit def close(): Unit } diff --git a/src/main/scala/mesosphere/marathon/core/event/impl/stream/HttpEventStreamHandleActor.scala b/src/main/scala/mesosphere/marathon/core/event/impl/stream/HttpEventStreamHandleActor.scala index 7915270bce3..9d959f28e40 100644 --- a/src/main/scala/mesosphere/marathon/core/event/impl/stream/HttpEventStreamHandleActor.scala +++ b/src/main/scala/mesosphere/marathon/core/event/impl/stream/HttpEventStreamHandleActor.scala @@ -5,11 +5,9 @@ import java.io.EOFException import akka.actor.{ Actor, ActorLogging, Status } import akka.event.EventStream import akka.pattern.pipe -import mesosphere.marathon.api.v2.json.Formats._ import mesosphere.marathon.core.event.impl.stream.HttpEventStreamHandleActor._ import mesosphere.marathon.core.event.{ EventStreamAttached, EventStreamDetached, MarathonEvent } import mesosphere.util.ThreadPoolContext -import play.api.libs.json.Json import scala.concurrent.Future import scala.util.Try @@ -60,7 +58,7 @@ class HttpEventStreamHandleActor( outstanding = List.empty[MarathonEvent] context.become(stashEvents) val sendFuture = Future { - toSend.foreach(event => handle.sendEvent(event.eventType, Json.stringify(eventToJson(event)))) + toSend.foreach(event => handle.sendEvent(event)) WorkDone }(ThreadPoolContext.ioContext) diff --git a/src/main/scala/mesosphere/marathon/core/event/impl/stream/HttpEventStreamServlet.scala b/src/main/scala/mesosphere/marathon/core/event/impl/stream/HttpEventStreamServlet.scala index ac110551724..db955b93b85 100644 --- a/src/main/scala/mesosphere/marathon/core/event/impl/stream/HttpEventStreamServlet.scala +++ b/src/main/scala/mesosphere/marathon/core/event/impl/stream/HttpEventStreamServlet.scala @@ -5,7 +5,7 @@ import javax.servlet.http.{ Cookie, HttpServletRequest, HttpServletResponse } import akka.actor.ActorRef import mesosphere.marathon.api.RequestFacade -import mesosphere.marathon.core.event.EventConf +import mesosphere.marathon.core.event.{ EventConf, MarathonEvent } import mesosphere.marathon.core.event.impl.stream.HttpEventStreamActor._ import mesosphere.marathon.plugin.auth._ import mesosphere.marathon.plugin.http.HttpResponse @@ -24,7 +24,7 @@ class HttpEventSSEHandle(request: HttpServletRequest, emitter: Emitter) extends lazy val id: String = UUID.randomUUID().toString - val subscribedEventTypes = request.getParameterMap.getOrDefault("event_type", Array.empty).toSet + private val subscribedEventTypes = request.getParameterMap.getOrDefault("event_type", Array.empty).toSet def subscribed(eventType: String): Boolean = { subscribedEventTypes.isEmpty || subscribedEventTypes.contains(eventType) @@ -34,8 +34,8 @@ class HttpEventSSEHandle(request: HttpServletRequest, emitter: Emitter) extends override def close(): Unit = emitter.close() - override def sendEvent(event: String, message: String): Unit = { - if (subscribed(event)) blocking(emitter.event(event, message)) + override def sendEvent(event: MarathonEvent): Unit = { + if (subscribed(event.eventType)) blocking(emitter.event(event.eventType, event.jsonString)) } override def toString: String = s"HttpEventSSEHandle($id on $remoteAddress on event types from $subscribedEventTypes)" diff --git a/src/test/scala/mesosphere/marathon/core/event/impl/stream/HttpEventSSEHandleTest.scala b/src/test/scala/mesosphere/marathon/core/event/impl/stream/HttpEventSSEHandleTest.scala index f6a1d0da67a..8555b60313d 100644 --- a/src/test/scala/mesosphere/marathon/core/event/impl/stream/HttpEventSSEHandleTest.scala +++ b/src/test/scala/mesosphere/marathon/core/event/impl/stream/HttpEventSSEHandleTest.scala @@ -5,36 +5,37 @@ import java.util.Collections import javax.servlet.http.HttpServletRequest import mesosphere.UnitTest +import mesosphere.marathon.core.event.{ Subscribe, Unsubscribe } import mesosphere.marathon.stream._ import org.eclipse.jetty.servlets.EventSource.Emitter class HttpEventSSEHandleTest extends UnitTest { "HttpEventSSEHandle" should { "events should be filtered" in { - Given("An emiter") + Given("An emitter") val emitter = mock[Emitter] Given("An request with params") val req = mock[HttpServletRequest] - req.getParameterMap returns Map("event_type" -> Array("xyz")) + req.getParameterMap returns Map("event_type" -> Array(unsubscribe.eventType)) Given("handler for request is created") val handle = new HttpEventSSEHandle(req, emitter) When("Want to sent unwanted event") - handle.sendEvent("any event", "") + handle.sendEvent(subscribed) Then("event should NOT be sent") - verify(emitter, never).event("any event", "") + verify(emitter, never).event(eq(subscribed.eventType), any[String]) When("Want to sent subscribed event") - handle.sendEvent("xyz", "") + handle.sendEvent(unsubscribe) Then("event should be sent") - verify(emitter).event("xyz", "") + verify(emitter).event(eq(unsubscribe.eventType), any[String]) } "events should NOT be filtered" in { - Given("An emiter") + Given("An emitter") val emitter = mock[Emitter] Given("An request without params") @@ -45,16 +46,19 @@ class HttpEventSSEHandleTest extends UnitTest { val handle = new HttpEventSSEHandle(req, emitter) When("Want to sent event") - handle.sendEvent("any event", "") - Then("event should NOT be sent") + handle.sendEvent(subscribed) - verify(emitter).event("any event", "") - When("Want to sent event") + Then("event should be sent") + verify(emitter).event(eq(subscribed.eventType), any[String]) - handle.sendEvent("xyz", "") + When("Want to sent event") + handle.sendEvent(unsubscribe) Then("event should be sent") - verify(emitter).event("xyz", "") + verify(emitter).event(eq(unsubscribe.eventType), any[String]) } } + + val subscribed = Subscribe("client IP", "callback URL") + val unsubscribe = Unsubscribe("client IP", "callback URL") } diff --git a/src/test/scala/mesosphere/marathon/core/event/impl/stream/HttpEventStreamHandleActorTest.scala b/src/test/scala/mesosphere/marathon/core/event/impl/stream/HttpEventStreamHandleActorTest.scala index 57c07179211..cc77b1a9776 100644 --- a/src/test/scala/mesosphere/marathon/core/event/impl/stream/HttpEventStreamHandleActorTest.scala +++ b/src/test/scala/mesosphere/marathon/core/event/impl/stream/HttpEventStreamHandleActorTest.scala @@ -8,7 +8,7 @@ import akka.actor.Props import akka.event.EventStream import akka.testkit.{ EventFilter, ImplicitSender, TestActorRef } import mesosphere.AkkaUnitTest -import mesosphere.marathon.core.event.{ EventStreamAttached, EventStreamDetached, Subscribe } +import mesosphere.marathon.core.event.{ EventStreamAttached, EventStreamDetached, MarathonEvent, Subscribe } import scala.concurrent.duration._ @@ -24,20 +24,20 @@ class HttpEventStreamHandleActorTest extends AkkaUnitTest with ImplicitSender { "A message send to the handle actor will be transferred to the stream handle" in new Fixture { Given("A handler that will postpone sending until latch is hit") val latch = new CountDownLatch(1) - handle.sendEvent(any[String], any[String]) answers (_ => latch.countDown()) + handle.sendEvent(any[MarathonEvent]) answers (_ => latch.countDown()) When("The event is send to the actor, the outstanding messages is 1") handleActor ! EventStreamAttached("remote") Then("We need to wait for the future to succeed") awaitCond(latch.getCount == 0) - verify(handle, times(1)).sendEvent(any[String], any[String]) + verify(handle, times(1)).sendEvent(any[MarathonEvent]) } "If the consumer is slow and maxOutstanding limit is reached, messages get dropped" in new Fixture { Given("A handler that will postpone the sending") val latch = new CountDownLatch(1) - handle.sendEvent(any[String], any[String]) answers (_ => latch.await()) + handle.sendEvent(any[MarathonEvent]) answers (_ => latch.await()) val filter = EventFilter(pattern = "Ignore event.*", occurrences = 1) When("More than the max size of outstanding events is send to the actor") @@ -52,7 +52,7 @@ class HttpEventStreamHandleActorTest extends AkkaUnitTest with ImplicitSender { "If the handler throws an EOF exception, the actor stops acting" in new Fixture { Given("A handler that will postpone the sending") - handle.sendEvent(any[String], any[String]) answers { _ => throw new EOFException() } + handle.sendEvent(any[MarathonEvent]) answers { _ => throw new EOFException() } val filter = EventFilter(pattern = "Received EOF.*", occurrences = 1) When("An event is send to actor") @@ -68,7 +68,7 @@ class HttpEventStreamHandleActorTest extends AkkaUnitTest with ImplicitSender { var events = List.empty[String] val handle = mock[HttpEventStreamHandle] val stream = mock[EventStream] - handle.sendEvent(any[String], any[String]) answers { args => events ::= args(0).asInstanceOf[String]; latch.await() } + handle.sendEvent(any[MarathonEvent]) answers { args => events ::= args(0).asInstanceOf[MarathonEvent].eventType; latch.await() } val handleActor: TestActorRef[HttpEventStreamHandleActor] = TestActorRef(Props( new HttpEventStreamHandleActor(handle, stream, 50) ))