Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Fixes #4948 | Lazy event parsing #4986

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/main/scala/mesosphere/marathon/core/event/Events.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package mesosphere.marathon.core.event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you double package this too?


import akka.event.EventStream
import com.fasterxml.jackson.annotation.JsonIgnore
import mesosphere.marathon.api.v2.json.Formats.eventToJson
import mesosphere.marathon.core.condition.Condition
import mesosphere.marathon.core.health.HealthCheck
import mesosphere.marathon.core.instance.update.InstanceChange
Expand All @@ -9,12 +11,15 @@ import mesosphere.marathon.core.instance.Instance
import mesosphere.marathon.state.{ AppDefinition, PathId, Timestamp }
import mesosphere.marathon.upgrade.{ DeploymentPlan, DeploymentStep }
import org.apache.mesos.{ Protos => Mesos }
import play.api.libs.json.Json

import scala.collection.immutable.Seq

sealed trait MarathonEvent {
val eventType: String
val timestamp: String
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could have all of these have lazy val jsonString....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry I don't understand. If you are referring to put lazy val jsonString into MarathonEvent Then we need to ignore this filed during serialization. I created new class to distinct data from their view. We use JSON now but we can use Protobuf or something else in the future. Should I move JSONEvent to SSE Actor?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for forgetting about this... since we control the serialization/deserialization of these, we could have the jsonString as a member and not serialize the field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@JsonIgnore
lazy val jsonString: String = Json.stringify(eventToJson(this))
}

// api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mesosphere.marathon.core.event.impl.stream
import akka.actor._
import com.google.inject.Inject
import mesosphere.marathon.core.election.{ ElectionService, LocalLeadershipEvent }
import mesosphere.marathon.core.event.MarathonEvent
import mesosphere.marathon.core.event.impl.stream.HttpEventStreamActor._
import mesosphere.marathon.metrics.Metrics.AtomicIntGauge
import mesosphere.marathon.metrics.{ MetricPrefixes, Metrics }
Expand All @@ -16,7 +17,7 @@ import scala.util.Try
trait HttpEventStreamHandle {
def id: String
def remoteAddress: String
def sendEvent(event: String, message: String): Unit
def sendEvent(event: MarathonEvent): Unit
def close(): Unit
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import java.io.EOFException
import akka.actor.{ Actor, ActorLogging, Status }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change the package into a double declaration?

e.g.

package mesosphere.marathon
package core.event.impl.stream```

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

import akka.event.EventStream
import akka.pattern.pipe
import mesosphere.marathon.api.v2.json.Formats._
import mesosphere.marathon.core.event.impl.stream.HttpEventStreamHandleActor._
import mesosphere.marathon.core.event.{ EventStreamAttached, EventStreamDetached, MarathonEvent }
import mesosphere.util.ThreadPoolContext
import play.api.libs.json.Json

import scala.concurrent.Future
import scala.util.Try
Expand Down Expand Up @@ -60,7 +58,7 @@ class HttpEventStreamHandleActor(
outstanding = List.empty[MarathonEvent]
context.become(stashEvents)
val sendFuture = Future {
toSend.foreach(event => handle.sendEvent(event.eventType, Json.stringify(eventToJson(event))))
toSend.foreach(event => handle.sendEvent(event))
WorkDone
}(ThreadPoolContext.ioContext)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import javax.servlet.http.{ Cookie, HttpServletRequest, HttpServletResponse }

import akka.actor.ActorRef
import mesosphere.marathon.api.RequestFacade
import mesosphere.marathon.core.event.EventConf
import mesosphere.marathon.core.event.{ EventConf, MarathonEvent }
import mesosphere.marathon.core.event.impl.stream.HttpEventStreamActor._
import mesosphere.marathon.plugin.auth._
import mesosphere.marathon.plugin.http.HttpResponse
Expand All @@ -24,7 +24,7 @@ class HttpEventSSEHandle(request: HttpServletRequest, emitter: Emitter) extends

lazy val id: String = UUID.randomUUID().toString

val subscribedEventTypes = request.getParameterMap.getOrDefault("event_type", Array.empty).toSet
private val subscribedEventTypes = request.getParameterMap.getOrDefault("event_type", Array.empty).toSet

def subscribed(eventType: String): Boolean = {
subscribedEventTypes.isEmpty || subscribedEventTypes.contains(eventType)
Expand All @@ -34,8 +34,8 @@ class HttpEventSSEHandle(request: HttpServletRequest, emitter: Emitter) extends

override def close(): Unit = emitter.close()

override def sendEvent(event: String, message: String): Unit = {
if (subscribed(event)) blocking(emitter.event(event, message))
override def sendEvent(event: MarathonEvent): Unit = {
if (subscribed(event.eventType)) blocking(emitter.event(event.eventType, event.jsonString))
}

override def toString: String = s"HttpEventSSEHandle($id on $remoteAddress on event types from $subscribedEventTypes)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,37 @@ import java.util.Collections
import javax.servlet.http.HttpServletRequest

import mesosphere.UnitTest
import mesosphere.marathon.core.event.{ Subscribe, Unsubscribe }
import mesosphere.marathon.stream._
import org.eclipse.jetty.servlets.EventSource.Emitter

class HttpEventSSEHandleTest extends UnitTest {
"HttpEventSSEHandle" should {
"events should be filtered" in {
Given("An emiter")
Given("An emitter")
val emitter = mock[Emitter]
Given("An request with params")
val req = mock[HttpServletRequest]
req.getParameterMap returns Map("event_type" -> Array("xyz"))
req.getParameterMap returns Map("event_type" -> Array(unsubscribe.eventType))

Given("handler for request is created")
val handle = new HttpEventSSEHandle(req, emitter)

When("Want to sent unwanted event")
handle.sendEvent("any event", "")
handle.sendEvent(subscribed)

Then("event should NOT be sent")
verify(emitter, never).event("any event", "")
verify(emitter, never).event(eq(subscribed.eventType), any[String])

When("Want to sent subscribed event")
handle.sendEvent("xyz", "")
handle.sendEvent(unsubscribe)

Then("event should be sent")
verify(emitter).event("xyz", "")
verify(emitter).event(eq(unsubscribe.eventType), any[String])
}

"events should NOT be filtered" in {
Given("An emiter")
Given("An emitter")
val emitter = mock[Emitter]

Given("An request without params")
Expand All @@ -45,16 +46,19 @@ class HttpEventSSEHandleTest extends UnitTest {
val handle = new HttpEventSSEHandle(req, emitter)

When("Want to sent event")
handle.sendEvent("any event", "")
Then("event should NOT be sent")
handle.sendEvent(subscribed)

verify(emitter).event("any event", "")
When("Want to sent event")
Then("event should be sent")
verify(emitter).event(eq(subscribed.eventType), any[String])

handle.sendEvent("xyz", "")
When("Want to sent event")
handle.sendEvent(unsubscribe)

Then("event should be sent")
verify(emitter).event("xyz", "")
verify(emitter).event(eq(unsubscribe.eventType), any[String])
}
}

val subscribed = Subscribe("client IP", "callback URL")
val unsubscribe = Unsubscribe("client IP", "callback URL")
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import akka.actor.Props
import akka.event.EventStream
import akka.testkit.{ EventFilter, ImplicitSender, TestActorRef }
import mesosphere.AkkaUnitTest
import mesosphere.marathon.core.event.{ EventStreamAttached, EventStreamDetached, Subscribe }
import mesosphere.marathon.core.event.{ EventStreamAttached, EventStreamDetached, MarathonEvent, Subscribe }

import scala.concurrent.duration._

Expand All @@ -24,20 +24,20 @@ class HttpEventStreamHandleActorTest extends AkkaUnitTest with ImplicitSender {
"A message send to the handle actor will be transferred to the stream handle" in new Fixture {
Given("A handler that will postpone sending until latch is hit")
val latch = new CountDownLatch(1)
handle.sendEvent(any[String], any[String]) answers (_ => latch.countDown())
handle.sendEvent(any[MarathonEvent]) answers (_ => latch.countDown())

When("The event is send to the actor, the outstanding messages is 1")
handleActor ! EventStreamAttached("remote")

Then("We need to wait for the future to succeed")
awaitCond(latch.getCount == 0)
verify(handle, times(1)).sendEvent(any[String], any[String])
verify(handle, times(1)).sendEvent(any[MarathonEvent])
}

"If the consumer is slow and maxOutstanding limit is reached, messages get dropped" in new Fixture {
Given("A handler that will postpone the sending")
val latch = new CountDownLatch(1)
handle.sendEvent(any[String], any[String]) answers (_ => latch.await())
handle.sendEvent(any[MarathonEvent]) answers (_ => latch.await())
val filter = EventFilter(pattern = "Ignore event.*", occurrences = 1)

When("More than the max size of outstanding events is send to the actor")
Expand All @@ -52,7 +52,7 @@ class HttpEventStreamHandleActorTest extends AkkaUnitTest with ImplicitSender {

"If the handler throws an EOF exception, the actor stops acting" in new Fixture {
Given("A handler that will postpone the sending")
handle.sendEvent(any[String], any[String]) answers { _ => throw new EOFException() }
handle.sendEvent(any[MarathonEvent]) answers { _ => throw new EOFException() }
val filter = EventFilter(pattern = "Received EOF.*", occurrences = 1)

When("An event is send to actor")
Expand All @@ -68,7 +68,7 @@ class HttpEventStreamHandleActorTest extends AkkaUnitTest with ImplicitSender {
var events = List.empty[String]
val handle = mock[HttpEventStreamHandle]
val stream = mock[EventStream]
handle.sendEvent(any[String], any[String]) answers { args => events ::= args(0).asInstanceOf[String]; latch.await() }
handle.sendEvent(any[MarathonEvent]) answers { args => events ::= args(0).asInstanceOf[MarathonEvent].eventType; latch.await() }
val handleActor: TestActorRef[HttpEventStreamHandleActor] = TestActorRef(Props(
new HttpEventStreamHandleActor(handle, stream, 50)
))
Expand Down