Skip to content

Commit

Permalink
Add constraints
Browse files Browse the repository at this point in the history
This PR adds support for `constraints` as described in mesos#256.

The format to define constraints of a job is the same as in Marathon.
Note that currently only the `CLUSTER` constraint has been implemented.
  • Loading branch information
wndhydrnt committed Apr 20, 2015
1 parent d2bbe99 commit ee715a4
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 9 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ you can also use a url in the command field, if your mesos was compiled with cUR
| container | This contains the subfields for the container, type (req), image (req), network (optional) and volumes (optional). | - |
| dataJob | Toggles whether the job tracks data (number of elements processed) | `false` |
| environmentVariables| An array of environment variables passed to the Mesos executor. For Docker containers, these are also passed to Docker using the -e flag. | - |
| constraints | Control where jobs run. Each constraint is compared against the [attributes of a Mesos slave](http://mesos.apache.org/documentation/attributes-resources/). See [Constraints](#constraints). | - |

### Sample Job

Expand Down Expand Up @@ -458,6 +459,18 @@ you can also use a url in the command field, if your mesos was compiled with cUR
}
```

## Constraints

### CLUSTER constraint

Schedule a job on nodes that share a common attribute.

```json
...
"constraints": [["rack", "CLUSTER", "rack-1"]],
...
```

## Job Management

For larger installations, the web UI may be insufficient for managing jobs. At
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.mesos.chronos.scheduler.jobs

import org.apache.mesos.chronos.scheduler.jobs.constraints.Constraint
import org.apache.mesos.chronos.utils.JobDeserializer
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
Expand Down Expand Up @@ -77,6 +78,8 @@ trait BaseJob {
def softError: Boolean = false

def dataProcessingJobType: Boolean = false

def constraints: Seq[Constraint] = List()
}

@JsonDeserialize(using = classOf[JobDeserializer])
Expand Down Expand Up @@ -110,7 +113,8 @@ case class ScheduleBasedJob(
@JsonProperty override val shell: Boolean = true,
@JsonProperty override val arguments: Seq[String] = List(),
@JsonProperty override val softError: Boolean = false,
override val dataProcessingJobType: Boolean = false)
override val dataProcessingJobType: Boolean = false,
@JsonProperty override val constraints: Seq[Constraint] = List())
extends BaseJob


Expand Down Expand Up @@ -144,5 +148,6 @@ case class DependencyBasedJob(
@JsonProperty override val shell: Boolean = true,
@JsonProperty override val arguments: Seq[String] = List(),
@JsonProperty override val softError: Boolean = false,
override val dataProcessingJobType: Boolean = false)
override val dataProcessingJobType: Boolean = false,
@JsonProperty override val constraints: Seq[Constraint] = List())
extends BaseJob
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.apache.mesos.chronos.scheduler.jobs.constraints

import org.apache.mesos.Protos

case class ClusterConstraint(attribute: String, value: String) extends Constraint {

def matches(attributes: Seq[Protos.Attribute]): Boolean = {
attributes.foreach { a =>
if (a.getName == attribute && a.getText.getValue == value) {
return true
}
}

false
}
}

object ClusterConstraint {
val OPERATOR = "CLUSTER"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.mesos.chronos.scheduler.jobs.constraints

import org.apache.mesos.Protos

trait Constraint {
def matches(attributes: Seq[Protos.Attribute]): Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.util.logging.Logger

import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration
import org.apache.mesos.chronos.scheduler.jobs._
import org.apache.mesos.chronos.scheduler.jobs.constraints.Constraint
import org.apache.mesos.chronos.utils.JobDeserializer
import com.google.inject.Inject
import mesosphere.mesos.util.FrameworkIdUtil
Expand Down Expand Up @@ -99,6 +100,16 @@ import scala.concurrent.ExecutionContext.Implicits.global
def generateLaunchableTasks(offerResources: mutable.HashMap[Offer, Resources]): mutable.Buffer[(String, BaseJob, Offer)] = {
val tasks = mutable.Buffer[(String, BaseJob, Offer)]()

def checkConstraints(attributes: Seq[Protos.Attribute], constraints: Seq[Constraint]): Boolean = {
constraints.foreach { c =>
if (!c.matches(attributes)) {
return false
}
}

true
}

def generate() {
taskManager.getTask match {
case None => log.info("No tasks scheduled or next task has been disabled.\n")
Expand All @@ -115,7 +126,9 @@ import scala.concurrent.ExecutionContext.Implicits.global
generate()
case None =>
val neededResources = new Resources(job)
offerResources.toIterator.find(_._2.canSatisfy(neededResources)) match {
offerResources.toIterator.find { ors =>
ors._2.canSatisfy(neededResources) && checkConstraints(ors._1.getAttributesList.asScala, job.constraints)
} match {
case Some((offer, resources)) =>
// Subtract this job's resource requirements from the remaining available resources in this offer.
resources -= neededResources
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package org.apache.mesos.chronos.utils

import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration
import org.apache.mesos.chronos.scheduler.jobs.{BaseJob, DependencyBasedJob, DockerContainer, EnvironmentVariable, NetworkMode, ScheduleBasedJob, Volume, VolumeMode}
import org.apache.mesos.chronos.scheduler.jobs._
import org.apache.mesos.chronos.scheduler.jobs.constraints._
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.node.ObjectNode
import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer, JsonNode}
Expand Down Expand Up @@ -165,6 +166,17 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
container = DockerContainer(containerNode.get("image").asText, volumes, networkMode)
}

val constraints = scala.collection.mutable.ListBuffer[Constraint]()
if (node.has("constraints")) {
for (c <- node.path("constraints")) {
c.get(1).asText match {
case ClusterConstraint.OPERATOR =>
constraints.add(ClusterConstraint(c.get(0).asText, c.get(2).asText))
case _ =>
}
}
}

var parentList = scala.collection.mutable.ListBuffer[String]()
if (node.has("parents")) {
for (parent <- node.path("parents")) {
Expand All @@ -177,7 +189,8 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
async = async, cpus = cpus, disk = disk, mem = mem, disabled = disabled,
errorsSinceLastSuccess = errorsSinceLastSuccess, uris = uris, highPriority = highPriority,
runAsUser = runAsUser, container = container, environmentVariables = environmentVariables, shell = shell,
arguments = arguments, softError = softError, dataProcessingJobType = dataProcessingJobType)
arguments = arguments, softError = softError, dataProcessingJobType = dataProcessingJobType,
constraints = constraints)
} else if (node.has("schedule")) {
val scheduleTimeZone = if (node.has("scheduleTimeZone")) node.get("scheduleTimeZone").asText else ""
new ScheduleBasedJob(node.get("schedule").asText, name = name, command = command,
Expand All @@ -188,7 +201,7 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
errorsSinceLastSuccess = errorsSinceLastSuccess, uris = uris, highPriority = highPriority,
runAsUser = runAsUser, container = container, scheduleTimeZone = scheduleTimeZone,
environmentVariables = environmentVariables, shell = shell, arguments = arguments, softError = softError,
dataProcessingJobType = dataProcessingJobType)
dataProcessingJobType = dataProcessingJobType, constraints = constraints)
} else {
/* schedule now */
new ScheduleBasedJob("R1//PT24H", name = name, command = command, epsilon = epsilon, successCount = successCount,
Expand All @@ -197,7 +210,8 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
async = async, cpus = cpus, disk = disk, mem = mem, disabled = disabled,
errorsSinceLastSuccess = errorsSinceLastSuccess, uris = uris, highPriority = highPriority,
runAsUser = runAsUser, container = container, environmentVariables = environmentVariables, shell = shell,
arguments = arguments, softError = softError, dataProcessingJobType = dataProcessingJobType)
arguments = arguments, softError = softError, dataProcessingJobType = dataProcessingJobType,
constraints = constraints)
}
}
}
15 changes: 15 additions & 0 deletions src/main/scala/org/apache/mesos/chronos/utils/JobSerializer.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.mesos.chronos.utils

import org.apache.mesos.chronos.scheduler.jobs.{BaseJob, DependencyBasedJob, ScheduleBasedJob}
import org.apache.mesos.chronos.scheduler.jobs.constraints.ClusterConstraint
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.{JsonSerializer, SerializerProvider}

Expand Down Expand Up @@ -136,6 +137,20 @@ class JobSerializer extends JsonSerializer[BaseJob] {
json.writeEndObject()
}

json.writeFieldName("constraints")
json.writeStartArray()
baseJob.constraints.foreach { v =>
json.writeStartArray()
v match {
case ClusterConstraint(attribute, value) =>
json.writeString(attribute)
json.writeString(ClusterConstraint.OPERATOR)
json.writeString(value)
}
json.writeEndArray()
}
json.writeEndArray()

baseJob match {
case depJob: DependencyBasedJob =>
json.writeFieldName("parents")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.mesos.chronos.scheduler.api

import org.apache.mesos.chronos.scheduler.jobs.constraints.ClusterConstraint
import org.apache.mesos.chronos.scheduler.jobs.{DependencyBasedJob, DockerContainer, EnvironmentVariable, ScheduleBasedJob, _}
import org.apache.mesos.chronos.utils.{JobDeserializer, JobSerializer}
import com.fasterxml.jackson.databind.ObjectMapper
Expand Down Expand Up @@ -33,10 +34,12 @@ class SerDeTest extends SpecificationWithJUnit {
"-testOne"
)

val constraints = List(ClusterConstraint("rack", "rack-1"))

val a = new DependencyBasedJob(Set("B", "C", "D", "E"), "A", "noop", Minutes.minutes(5).toPeriod, 10L,
20L, "fooexec", "fooflags", 7, "[email protected]", "Foo", "Test dependency-based job", "TODAY",
"YESTERDAY", true, container = container, environmentVariables = environmentVariables,
shell = false, arguments = arguments, softError = true)
shell = false, arguments = arguments, softError = true, constraints = constraints)

val aStr = objectMapper.writeValueAsString(a)
val aCopy = objectMapper.readValue(aStr, classOf[DependencyBasedJob])
Expand Down Expand Up @@ -67,10 +70,12 @@ class SerDeTest extends SpecificationWithJUnit {
"-testOne"
)

val constraints = List(ClusterConstraint("rack", "rack-1"))

val a = new ScheduleBasedJob("FOO/BAR/BAM", "A", "noop", Minutes.minutes(5).toPeriod, 10L, 20L,
"fooexec", "fooflags", 7, "[email protected]", "Foo", "Test schedule-based job", "TODAY",
"YESTERDAY", true, container = container, environmentVariables = environmentVariables,
shell = true, arguments = arguments, softError = true)
shell = true, arguments = arguments, softError = true, constraints = constraints)

val aStr = objectMapper.writeValueAsString(a)
val aCopy = objectMapper.readValue(aStr, classOf[ScheduleBasedJob])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.apache.mesos.chronos.scheduler.jobs.constraints

import org.apache.mesos.Protos
import org.specs2.mutable.SpecificationWithJUnit

class ClusterConstraintSpec extends SpecificationWithJUnit {

def createAttribute(name: String, value: String): Protos.Attribute = {
Protos.Attribute.newBuilder()
.setName(name)
.setText(Protos.Value.Text.newBuilder().setValue(value))
.setType(Protos.Value.Type.TEXT)
.build()
}

"matches attributes" in {
val attributes = List(createAttribute("dc", "north"), createAttribute("rack", "rack-1"))

val constraint = ClusterConstraint("rack", "rack-1")

constraint.matches(attributes) must_== true
}

"does not match attributes" in {
val attributes = List(createAttribute("dc", "north"), createAttribute("rack", "rack-1"))

val constraint = ClusterConstraint("rack", "rack-2")

constraint.matches(attributes) must_== false
}

}

0 comments on commit ee715a4

Please sign in to comment.