From ee715a424b3bc941dd76ba9b52c51ddeeff69e3d Mon Sep 17 00:00:00 2001 From: wndhydrnt Date: Sat, 18 Apr 2015 16:03:33 +0200 Subject: [PATCH] Add constraints This PR adds support for `constraints` as described in #256. The format to define constraints of a job is the same as in Marathon. Note that currently only the `CLUSTER` constraint has been implemented. --- README.md | 13 ++++++++ .../mesos/chronos/scheduler/jobs/Jobs.scala | 9 ++++-- .../jobs/constraints/ClusterConstraint.scala | 20 ++++++++++++ .../jobs/constraints/Constraint.scala | 7 ++++ .../scheduler/mesos/MesosJobFramework.scala | 15 ++++++++- .../mesos/chronos/utils/JobDeserializer.scala | 22 ++++++++++--- .../mesos/chronos/utils/JobSerializer.scala | 15 +++++++++ .../chronos/scheduler/api/SerDeTest.scala | 9 ++++-- .../constraints/ClusterConstraintSpec.scala | 32 +++++++++++++++++++ 9 files changed, 133 insertions(+), 9 deletions(-) create mode 100644 src/main/scala/org/apache/mesos/chronos/scheduler/jobs/constraints/ClusterConstraint.scala create mode 100644 src/main/scala/org/apache/mesos/chronos/scheduler/jobs/constraints/Constraint.scala create mode 100644 src/test/scala/org/apache/mesos/chronos/scheduler/jobs/constraints/ClusterConstraintSpec.scala diff --git a/README.md b/README.md index 1975d4908..0a83f7c4b 100644 --- a/README.md +++ b/README.md @@ -422,6 +422,7 @@ you can also use a url in the command field, if your mesos was compiled with cUR | container | This contains the subfields for the container, type (req), image (req), network (optional) and volumes (optional). | - | | dataJob | Toggles whether the job tracks data (number of elements processed) | `false` | | environmentVariables| An array of environment variables passed to the Mesos executor. For Docker containers, these are also passed to Docker using the -e flag. | - | +| constraints | Control where jobs run. Each constraint is compared against the [attributes of a Mesos slave](http://mesos.apache.org/documentation/attributes-resources/). See [Constraints](#constraints). | - | ### Sample Job @@ -458,6 +459,18 @@ you can also use a url in the command field, if your mesos was compiled with cUR } ``` +## Constraints + +### CLUSTER constraint + +Schedule a job on nodes that share a common attribute. + +```json +... +"constraints": [["rack", "CLUSTER", "rack-1"]], +... +``` + ## Job Management For larger installations, the web UI may be insufficient for managing jobs. At diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/Jobs.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/Jobs.scala index 075354377..c12cfd0be 100644 --- a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/Jobs.scala +++ b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/Jobs.scala @@ -1,5 +1,6 @@ package org.apache.mesos.chronos.scheduler.jobs +import org.apache.mesos.chronos.scheduler.jobs.constraints.Constraint import org.apache.mesos.chronos.utils.JobDeserializer import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.databind.annotation.JsonDeserialize @@ -77,6 +78,8 @@ trait BaseJob { def softError: Boolean = false def dataProcessingJobType: Boolean = false + + def constraints: Seq[Constraint] = List() } @JsonDeserialize(using = classOf[JobDeserializer]) @@ -110,7 +113,8 @@ case class ScheduleBasedJob( @JsonProperty override val shell: Boolean = true, @JsonProperty override val arguments: Seq[String] = List(), @JsonProperty override val softError: Boolean = false, - override val dataProcessingJobType: Boolean = false) + override val dataProcessingJobType: Boolean = false, + @JsonProperty override val constraints: Seq[Constraint] = List()) extends BaseJob @@ -144,5 +148,6 @@ case class DependencyBasedJob( @JsonProperty override val shell: Boolean = true, @JsonProperty override val arguments: Seq[String] = List(), @JsonProperty override val softError: Boolean = false, - override val dataProcessingJobType: Boolean = false) + override val dataProcessingJobType: Boolean = false, + @JsonProperty override val constraints: Seq[Constraint] = List()) extends BaseJob diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/constraints/ClusterConstraint.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/constraints/ClusterConstraint.scala new file mode 100644 index 000000000..ad91da6c4 --- /dev/null +++ b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/constraints/ClusterConstraint.scala @@ -0,0 +1,20 @@ +package org.apache.mesos.chronos.scheduler.jobs.constraints + +import org.apache.mesos.Protos + +case class ClusterConstraint(attribute: String, value: String) extends Constraint { + + def matches(attributes: Seq[Protos.Attribute]): Boolean = { + attributes.foreach { a => + if (a.getName == attribute && a.getText.getValue == value) { + return true + } + } + + false + } +} + +object ClusterConstraint { + val OPERATOR = "CLUSTER" +} diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/constraints/Constraint.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/constraints/Constraint.scala new file mode 100644 index 000000000..b3128aa00 --- /dev/null +++ b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/constraints/Constraint.scala @@ -0,0 +1,7 @@ +package org.apache.mesos.chronos.scheduler.jobs.constraints + +import org.apache.mesos.Protos + +trait Constraint { + def matches(attributes: Seq[Protos.Attribute]): Boolean +} diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/MesosJobFramework.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/MesosJobFramework.scala index fb3129dcc..f94360905 100644 --- a/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/MesosJobFramework.scala +++ b/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/MesosJobFramework.scala @@ -4,6 +4,7 @@ import java.util.logging.Logger import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration import org.apache.mesos.chronos.scheduler.jobs._ +import org.apache.mesos.chronos.scheduler.jobs.constraints.Constraint import org.apache.mesos.chronos.utils.JobDeserializer import com.google.inject.Inject import mesosphere.mesos.util.FrameworkIdUtil @@ -99,6 +100,16 @@ import scala.concurrent.ExecutionContext.Implicits.global def generateLaunchableTasks(offerResources: mutable.HashMap[Offer, Resources]): mutable.Buffer[(String, BaseJob, Offer)] = { val tasks = mutable.Buffer[(String, BaseJob, Offer)]() + def checkConstraints(attributes: Seq[Protos.Attribute], constraints: Seq[Constraint]): Boolean = { + constraints.foreach { c => + if (!c.matches(attributes)) { + return false + } + } + + true + } + def generate() { taskManager.getTask match { case None => log.info("No tasks scheduled or next task has been disabled.\n") @@ -115,7 +126,9 @@ import scala.concurrent.ExecutionContext.Implicits.global generate() case None => val neededResources = new Resources(job) - offerResources.toIterator.find(_._2.canSatisfy(neededResources)) match { + offerResources.toIterator.find { ors => + ors._2.canSatisfy(neededResources) && checkConstraints(ors._1.getAttributesList.asScala, job.constraints) + } match { case Some((offer, resources)) => // Subtract this job's resource requirements from the remaining available resources in this offer. resources -= neededResources diff --git a/src/main/scala/org/apache/mesos/chronos/utils/JobDeserializer.scala b/src/main/scala/org/apache/mesos/chronos/utils/JobDeserializer.scala index 36f8d5600..30df88d94 100644 --- a/src/main/scala/org/apache/mesos/chronos/utils/JobDeserializer.scala +++ b/src/main/scala/org/apache/mesos/chronos/utils/JobDeserializer.scala @@ -1,7 +1,8 @@ package org.apache.mesos.chronos.utils import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration -import org.apache.mesos.chronos.scheduler.jobs.{BaseJob, DependencyBasedJob, DockerContainer, EnvironmentVariable, NetworkMode, ScheduleBasedJob, Volume, VolumeMode} +import org.apache.mesos.chronos.scheduler.jobs._ +import org.apache.mesos.chronos.scheduler.jobs.constraints._ import com.fasterxml.jackson.core.JsonParser import com.fasterxml.jackson.databind.node.ObjectNode import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer, JsonNode} @@ -165,6 +166,17 @@ class JobDeserializer extends JsonDeserializer[BaseJob] { container = DockerContainer(containerNode.get("image").asText, volumes, networkMode) } + val constraints = scala.collection.mutable.ListBuffer[Constraint]() + if (node.has("constraints")) { + for (c <- node.path("constraints")) { + c.get(1).asText match { + case ClusterConstraint.OPERATOR => + constraints.add(ClusterConstraint(c.get(0).asText, c.get(2).asText)) + case _ => + } + } + } + var parentList = scala.collection.mutable.ListBuffer[String]() if (node.has("parents")) { for (parent <- node.path("parents")) { @@ -177,7 +189,8 @@ class JobDeserializer extends JsonDeserializer[BaseJob] { async = async, cpus = cpus, disk = disk, mem = mem, disabled = disabled, errorsSinceLastSuccess = errorsSinceLastSuccess, uris = uris, highPriority = highPriority, runAsUser = runAsUser, container = container, environmentVariables = environmentVariables, shell = shell, - arguments = arguments, softError = softError, dataProcessingJobType = dataProcessingJobType) + arguments = arguments, softError = softError, dataProcessingJobType = dataProcessingJobType, + constraints = constraints) } else if (node.has("schedule")) { val scheduleTimeZone = if (node.has("scheduleTimeZone")) node.get("scheduleTimeZone").asText else "" new ScheduleBasedJob(node.get("schedule").asText, name = name, command = command, @@ -188,7 +201,7 @@ class JobDeserializer extends JsonDeserializer[BaseJob] { errorsSinceLastSuccess = errorsSinceLastSuccess, uris = uris, highPriority = highPriority, runAsUser = runAsUser, container = container, scheduleTimeZone = scheduleTimeZone, environmentVariables = environmentVariables, shell = shell, arguments = arguments, softError = softError, - dataProcessingJobType = dataProcessingJobType) + dataProcessingJobType = dataProcessingJobType, constraints = constraints) } else { /* schedule now */ new ScheduleBasedJob("R1//PT24H", name = name, command = command, epsilon = epsilon, successCount = successCount, @@ -197,7 +210,8 @@ class JobDeserializer extends JsonDeserializer[BaseJob] { async = async, cpus = cpus, disk = disk, mem = mem, disabled = disabled, errorsSinceLastSuccess = errorsSinceLastSuccess, uris = uris, highPriority = highPriority, runAsUser = runAsUser, container = container, environmentVariables = environmentVariables, shell = shell, - arguments = arguments, softError = softError, dataProcessingJobType = dataProcessingJobType) + arguments = arguments, softError = softError, dataProcessingJobType = dataProcessingJobType, + constraints = constraints) } } } diff --git a/src/main/scala/org/apache/mesos/chronos/utils/JobSerializer.scala b/src/main/scala/org/apache/mesos/chronos/utils/JobSerializer.scala index 24707aa23..8a97e144b 100644 --- a/src/main/scala/org/apache/mesos/chronos/utils/JobSerializer.scala +++ b/src/main/scala/org/apache/mesos/chronos/utils/JobSerializer.scala @@ -1,6 +1,7 @@ package org.apache.mesos.chronos.utils import org.apache.mesos.chronos.scheduler.jobs.{BaseJob, DependencyBasedJob, ScheduleBasedJob} +import org.apache.mesos.chronos.scheduler.jobs.constraints.ClusterConstraint import com.fasterxml.jackson.core.JsonGenerator import com.fasterxml.jackson.databind.{JsonSerializer, SerializerProvider} @@ -136,6 +137,20 @@ class JobSerializer extends JsonSerializer[BaseJob] { json.writeEndObject() } + json.writeFieldName("constraints") + json.writeStartArray() + baseJob.constraints.foreach { v => + json.writeStartArray() + v match { + case ClusterConstraint(attribute, value) => + json.writeString(attribute) + json.writeString(ClusterConstraint.OPERATOR) + json.writeString(value) + } + json.writeEndArray() + } + json.writeEndArray() + baseJob match { case depJob: DependencyBasedJob => json.writeFieldName("parents") diff --git a/src/test/scala/org/apache/mesos/chronos/scheduler/api/SerDeTest.scala b/src/test/scala/org/apache/mesos/chronos/scheduler/api/SerDeTest.scala index c5423b28d..d0831bbf9 100644 --- a/src/test/scala/org/apache/mesos/chronos/scheduler/api/SerDeTest.scala +++ b/src/test/scala/org/apache/mesos/chronos/scheduler/api/SerDeTest.scala @@ -1,5 +1,6 @@ package org.apache.mesos.chronos.scheduler.api +import org.apache.mesos.chronos.scheduler.jobs.constraints.ClusterConstraint import org.apache.mesos.chronos.scheduler.jobs.{DependencyBasedJob, DockerContainer, EnvironmentVariable, ScheduleBasedJob, _} import org.apache.mesos.chronos.utils.{JobDeserializer, JobSerializer} import com.fasterxml.jackson.databind.ObjectMapper @@ -33,10 +34,12 @@ class SerDeTest extends SpecificationWithJUnit { "-testOne" ) + val constraints = List(ClusterConstraint("rack", "rack-1")) + val a = new DependencyBasedJob(Set("B", "C", "D", "E"), "A", "noop", Minutes.minutes(5).toPeriod, 10L, 20L, "fooexec", "fooflags", 7, "foo@bar.com", "Foo", "Test dependency-based job", "TODAY", "YESTERDAY", true, container = container, environmentVariables = environmentVariables, - shell = false, arguments = arguments, softError = true) + shell = false, arguments = arguments, softError = true, constraints = constraints) val aStr = objectMapper.writeValueAsString(a) val aCopy = objectMapper.readValue(aStr, classOf[DependencyBasedJob]) @@ -67,10 +70,12 @@ class SerDeTest extends SpecificationWithJUnit { "-testOne" ) + val constraints = List(ClusterConstraint("rack", "rack-1")) + val a = new ScheduleBasedJob("FOO/BAR/BAM", "A", "noop", Minutes.minutes(5).toPeriod, 10L, 20L, "fooexec", "fooflags", 7, "foo@bar.com", "Foo", "Test schedule-based job", "TODAY", "YESTERDAY", true, container = container, environmentVariables = environmentVariables, - shell = true, arguments = arguments, softError = true) + shell = true, arguments = arguments, softError = true, constraints = constraints) val aStr = objectMapper.writeValueAsString(a) val aCopy = objectMapper.readValue(aStr, classOf[ScheduleBasedJob]) diff --git a/src/test/scala/org/apache/mesos/chronos/scheduler/jobs/constraints/ClusterConstraintSpec.scala b/src/test/scala/org/apache/mesos/chronos/scheduler/jobs/constraints/ClusterConstraintSpec.scala new file mode 100644 index 000000000..3df62a5c3 --- /dev/null +++ b/src/test/scala/org/apache/mesos/chronos/scheduler/jobs/constraints/ClusterConstraintSpec.scala @@ -0,0 +1,32 @@ +package org.apache.mesos.chronos.scheduler.jobs.constraints + +import org.apache.mesos.Protos +import org.specs2.mutable.SpecificationWithJUnit + +class ClusterConstraintSpec extends SpecificationWithJUnit { + + def createAttribute(name: String, value: String): Protos.Attribute = { + Protos.Attribute.newBuilder() + .setName(name) + .setText(Protos.Value.Text.newBuilder().setValue(value)) + .setType(Protos.Value.Type.TEXT) + .build() + } + + "matches attributes" in { + val attributes = List(createAttribute("dc", "north"), createAttribute("rack", "rack-1")) + + val constraint = ClusterConstraint("rack", "rack-1") + + constraint.matches(attributes) must_== true + } + + "does not match attributes" in { + val attributes = List(createAttribute("dc", "north"), createAttribute("rack", "rack-1")) + + val constraint = ClusterConstraint("rack", "rack-2") + + constraint.matches(attributes) must_== false + } + +}