Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Fixes #4948 | Lazy event parsing in HTTP callbacks. (#5114)
Browse files Browse the repository at this point in the history
We can use `jsonString` form `MarathonEvent` to ommit double parsing
evnets.
  • Loading branch information
janisz authored and unterstein committed Feb 7, 2017
1 parent 8b0ee47 commit 21d4d70
Showing 1 changed file with 6 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package mesosphere.marathon.core.event.impl.callback

import akka.actor._
import akka.pattern.ask
import mesosphere.marathon.api.v2.json.Formats._
import mesosphere.marathon.core.base.Clock
import mesosphere.marathon.core.event.impl.callback.HttpEventActor._
import mesosphere.marathon.core.event._
import mesosphere.marathon.core.event.impl.callback.HttpEventActor._
import mesosphere.marathon.core.event.impl.callback.SubscribersKeeperActor.GetSubscribers
import mesosphere.marathon.metrics.{ MetricPrefixes, Metrics }
import play.api.libs.json.JsValue
import spray.client.pipelining.{ sendReceive, _ }
import spray.http.{ HttpRequest, HttpResponse }
import spray.httpx.PlayJsonSupport

import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future }
Expand Down Expand Up @@ -60,7 +57,7 @@ class HttpEventActor(
subscribersKeeper: ActorRef,
metrics: HttpEventActorMetrics,
clock: Clock)
extends Actor with ActorLogging with PlayJsonSupport {
extends Actor with ActorLogging {

implicit val timeout = conf.eventRequestTimeout
def pipeline(implicit ec: ExecutionContext): HttpRequest => Future[HttpResponse] = {
Expand Down Expand Up @@ -96,8 +93,7 @@ class HttpEventActor(
//remove all unsubscribed callback listener
limiter = limiter.filterKeys(subscribers.urls).iterator.toMap.withDefaultValue(NoLimit)
metrics.skippedCallbacks.mark(limited.size)
val jsonEvent = eventToJson(event)
active.foreach(url => Try(post(url, jsonEvent, self)) match {
active.foreach(url => Try(post(url, event, self)) match {
case Success(res) =>
case Failure(ex) =>
log.warning(s"Failed to post $event to $url because ${ex.getClass.getSimpleName}: ${ex.getMessage}")
Expand All @@ -106,18 +102,18 @@ class HttpEventActor(
})
}

def post(url: String, event: JsValue, eventActor: ActorRef): Unit = {
def post(url: String, event: MarathonEvent, eventActor: ActorRef): Unit = {
log.info("Sending POST to:" + url)

metrics.outstandingCallbacks.inc()
val start = clock.now()
val request = Post(url, event)
val request = Post(url, event.jsonString)

val response = pipeline(context.dispatcher)(request)

import context.dispatcher
response.onComplete {
case _ =>
_ =>
metrics.outstandingCallbacks.dec()
metrics.callbackResponseTime.update(start.until(clock.now()))
}
Expand Down

0 comments on commit 21d4d70

Please sign in to comment.