Skip to content

Commit

Permalink
Memory prediction and task scaling
Browse files Browse the repository at this point in the history
* collect task execution results and store them in the memory optimizer

* restructured TaskScaler

* cleanup debug logs

* cleanup Scheduler

* cleanup Task

* cleanup Task

* add hook after workflow is completed

* change MemoryOptimizer to be an interface, add two different Optimizers

* round suggestions to ceiling

* introduced LinearPredictor

* changed indentation to 4 spaces, like rest of the project uses

* fix mis-formatting

* fix typos

* initial implementation linearPredictor

* builder for observations

* added test for constant predictor

* remove wasted calculation from observation

* remove wasted calculation from observation

* add NonePredictorTest

* sanity checks for observations

* add negative case for ConstantPredictor

* assert rise and fall of suggestions

* added LinearPredictorTest

* avoid negative preditions

* use SimpleRegression for LinearPredictor

* fix naming to always be prediction, instead of suggestion

* fix naming

* fix some minor issues

* collect statistics

* removed Limits, rely solely on Requests instead

* added new CombiPredictor

* remove solved fixme

* csv export

* save statistics summary and csv into file in workflow baseDir

* added Tasks realtime to statistic, moved NfTrace reader to own utility class

* add test if trace file is missing and handling for that

* fix no resize when request was 0

* apply config only in dev profile

* statistics log execution and predictor

* log makespan

* add peak_vmem for sanity checks

* add unit tests for statistics

* added todos for missing testcases

* fix for-loop should continue, not break

* only invoke TaskScaler when config was given

* get memory predictor from config, not from environment

* removed double code

* prepare application.yml for merge

* prepare application.yml for merge

* fixed decimal seperator

* fix decimal seperator

* changed logging in dev profile

* improved predictor selection order

* added template for square predictor

* collect wasted in summary

* add wasted to statistics

* avoid updating tasks when no new model is available

* added new testcases

* change return value for missing file to -1

* changed sanity check

* fix constant predictor

* faster overprovisioning

* add wary predictor

* fix imports

* wary predictor

* filter realtime 0

* use vmem instead of rss

* correct tests

* require 4 successful observations

* ignore list feature

* never provide predictions lower than the lowest successful value was

* prevent cws from get stuck

* blacklist failed tasks

* removed flawed wasted from csv, added assigned node

* lower limit for request size 256MiB

* fix bad naming

* junit test for TaskScaler

* add remark for TaskScalerTest

* fix used predictor

* removed unimplemended square predictor

* removed unused generation feature from constant predictor

* removed unused generation feature

* cleanup classname

* removed testcase that is no longer in line with desired behaviour

* fixed comments

* add description to README

* add description to README

* catch exception that is thrown when InPlacePodVerticalScaling is not enabled

* add note on profiles in README

* always write log to file

* check reason for exception and improve error message, then disable task scaling

* fix comment

* fix formatting

* moved patchTaskMemory method

* add tracing note in README

* reduce loglevel

* change predictor interface to return BigDecimal

* extracted constant for lowest memory request value

* add o.taskName to log, when available
  • Loading branch information
friederici authored Mar 4, 2024
1 parent c1a30da commit fb7ebac
Show file tree
Hide file tree
Showing 26 changed files with 3,107 additions and 0 deletions.
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,41 @@ spec:
claimName: api-exp-data
```

#### Profiles
This is a Spring Boot application, that can be run with profiles. The "default" profile is used if no configuration is set. The "dev" profile can be enabled by setting the JVM System Parameter

-Dspring.profiles.active=dev
or Environment Variable

export spring_profiles_active=dev
or via the corresponding setting in your development environment or within the pod definition.

Example:

$ SCHEDULER_NAME=workflow-scheduler java -Dspring.profiles.active=dev -jar cws-k8s-scheduler-1.2-SNAPSHOT.jar

The "dev" profile is useful for debugging and reporting problems because it increases the log-level.

---
#### Memory Prediction and Task Scaling
- Supported if used together with [nf-cws](https://github.com/CommonWorkflowScheduler/nf-cws) version 1.0.4 or newer.
- Kubernetes Feature InPlacePodVerticalScaling must be enabled. This is available starting from Kubernetes v1.27. See [KEP 1287](https://github.com/kubernetes/enhancements/issues/1287) for the current status.
- It is required to enable traces in Nextflow via `trace.enabled = true` in the config file, or the commandline option `-with-trace`.

The memory predictor that shall be used for task scaling is set via the nf-cws configuration. If not set, task scaling is disabled.

| cws.memoryPredictor | Behaviour |
|---------------------|------------------------------------------------------------------------------------------------------------------------------------|
| "" | Disabled if empty or not set. |
| none | NonePredictor, will never make any predictions and consequently no task scaling will occur. Used for testing and benchmarking only.|
| constant | ConstantPredictor, will try to predict a constant memory usage pattern. |
| linear | LinearPredictor, will try to predict a memory usage that is linear to the task input size. |
| combi | CombiPredictor, combines predictions from ConstantPredictor and LinearPredictor. |
| wary | WaryPredictor, behaves like LinearPredictor but is more cautious about its predictions. |
| default | Query the environment variable "MEMORY_PREDICTOR_DEFAULT" and use the value that is set there. |

If a memory predictor is selected (i.e. setting is not disabled), the implementation will locally record statistics and print out the result after the workflow has finished. This can be disabled via the environment variable "DISABLE_STATISTICS". If this is set to any string, the implementation will not collect and print out the results.

---

If you use this software or artifacts in a publication, please cite it as:
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@
<artifactId>jackson-annotations</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.6.1</version>
</dependency>

</dependencies>

<build>
Expand Down
52 changes: 52 additions & 0 deletions src/main/java/cws/k8s/scheduler/client/KubernetesClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import cws.k8s.scheduler.model.NodeWithAlloc;
import cws.k8s.scheduler.model.PodWithAge;
import cws.k8s.scheduler.model.Task;
import io.fabric8.kubernetes.api.model.*;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
Expand Down Expand Up @@ -218,4 +219,55 @@ public void onClose(WatcherException cause) {

}

/**
* After some testing, this was found to be the only reliable way to patch a pod
* using the Kubernetes client.
*
* It will create a patch for the memory limits and request values and submit it
* to the cluster.
*
* @param t the task to be patched
* @param value the value to be set
* @return false if patching failed because of InPlacePodVerticalScaling
*/
public boolean patchTaskMemory(Task t, String value) {
String namespace = t.getPod().getMetadata().getNamespace();
String podname = t.getPod().getName();
log.debug("namespace: {}, podname: {}", namespace, podname);
// @formatter:off
String patch = "kind: Pod\n"
+ "apiVersion: v1\n"
+ "metadata:\n"
+ " name: PODNAME\n"
+ " namespace: NAMESPACE\n"
+ "spec:\n"
+ " containers:\n"
+ " - name: PODNAME\n"
+ " resources:\n"
+ " limits:\n"
+ " memory: LIMIT\n"
+ " requests:\n"
+ " memory: REQUEST\n"
+ "\n";
// @formatter:on
patch = patch.replace("NAMESPACE", namespace);
patch = patch.replace("PODNAME", podname);
patch = patch.replace("LIMIT", value);
patch = patch.replace("REQUEST", value);
log.debug(patch);

try {
this.pods().inNamespace(namespace).withName(podname).patch(patch);
} catch (KubernetesClientException e) {
// this typically happens when the feature gate InPlacePodVerticalScaling was not enabled
if (e.toString().contains("Forbidden: pod updates may not change fields other than")) {
log.error("Could not patch task. Please make sure that the feature gate 'InPlacePodVerticalScaling' is enabled in Kubernetes. See https://github.com/kubernetes/enhancements/issues/1287 for details. Task scaling will now be disabled for the rest of this workflow execution.");
return false;
} else {
log.error("Could not patch task: {}", e);
}
}
return true;
}

}
85 changes: 85 additions & 0 deletions src/main/java/cws/k8s/scheduler/memory/CombiPredictor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2023, Florian Friederici. All rights reserved.
*
* This code is free software: you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option)
* any later version.
*
* This code is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this work. If not, see <https://www.gnu.org/licenses/>.
*/

package cws.k8s.scheduler.memory;

import java.math.BigDecimal;

import cws.k8s.scheduler.model.Task;
import lombok.extern.slf4j.Slf4j;

//@formatter:off
/**
* CombiPredictor will combine predictions made by ConstantPredictor and
* LineraPredictor.
*
* LinearPredictor fails if there are no inputSize differences to tasks,
* ConstantPredictor can handle this case. So CombiPredictor will run both and
* decide dynamically which predictions to apply.
*
* @author Florian Friederici
*
*/
//@formatter:on
@Slf4j
public class CombiPredictor implements MemoryPredictor {

ConstantPredictor constantPredictor;
LinearPredictor linearPredictor;

public CombiPredictor() {
this.constantPredictor = new ConstantPredictor();
this.linearPredictor = new LinearPredictor();
}

@Override
public void addObservation(Observation o) {
log.debug("CombiPredictor.addObservation({})", o);
constantPredictor.addObservation(o);
linearPredictor.addObservation(o);
}

@Override
public BigDecimal queryPrediction(Task task) {
String taskName = task.getConfig().getTask();
log.debug("CombiPredictor.queryPrediction({},{})", taskName, task.getInputSize());

BigDecimal constantPrediction = constantPredictor.queryPrediction(task);
BigDecimal linearPrediction = linearPredictor.queryPrediction(task);

if (constantPrediction==null && linearPrediction==null) {
// no prediction available at all
return null;
}

if (constantPrediction!=null && linearPrediction==null) {
// only the constantPrediction is available
return constantPrediction;
}

if (constantPrediction==null && linearPrediction!=null) {
// only the linearPrediction is available (unusual case)
return linearPrediction;
}

log.debug("constantPrediction={}, linearPrediction={}, difference={}", constantPrediction, linearPrediction, constantPrediction.subtract(linearPrediction));

// prefer linearPrediction if both would be available
return linearPrediction;
}

}
98 changes: 98 additions & 0 deletions src/main/java/cws/k8s/scheduler/memory/ConstantPredictor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2023, Florian Friederici. All rights reserved.
*
* This code is free software: you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option)
* any later version.
*
* This code is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this work. If not, see <https://www.gnu.org/licenses/>.
*/

package cws.k8s.scheduler.memory;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.HashMap;
import java.util.Map;

import cws.k8s.scheduler.model.Task;
import lombok.extern.slf4j.Slf4j;

// @formatter:off
/**
* ConstantPredictor will use the following strategy:
*
* - In case task was successful:
* - let the next prediction be 10% higher, then the peakRss was
*
* - In case task has failed:
* - reset to initial value
*
* I.e. the suggestions from ConstantPredictor are not dependent on the input
* size of the tasks.
*
* @author Florian Friederici
*
*/
// @formatter:on
@Slf4j
class ConstantPredictor implements MemoryPredictor {

Map<String, BigDecimal> model;
Map<String, BigDecimal> initialValue;

public ConstantPredictor() {
model = new HashMap<>();
initialValue = new HashMap<>();
}

@Override
public void addObservation(Observation o) {
log.debug("ConstantPredictor.addObservation({})", o);
if (!TaskScaler.checkObservationSanity(o)) {
log.warn("dismiss observation {}", o);
return;
}

// store initial ramRequest value per task
if (!initialValue.containsKey(o.task)) {
initialValue.put(o.task, o.getRamRequest());
}

if (Boolean.TRUE.equals(o.success)) {
// set model to peakRss + 10%
if (model.containsKey(o.task)) {
model.replace(o.task, o.peakRss.multiply(new BigDecimal("1.1")).setScale(0, RoundingMode.CEILING));
} else {
model.put(o.task, o.peakRss.multiply(new BigDecimal("1.1")).setScale(0, RoundingMode.CEILING));
}
} else {
// reset to initialValue
if (model.containsKey(o.task)) {
model.replace(o.task, this.initialValue.get(o.task));
} else {
model.put(o.task, o.ramRequest.multiply(new BigDecimal(2)).setScale(0, RoundingMode.CEILING));
}
}

}

@Override
public BigDecimal queryPrediction(Task task) {
String taskName = task.getConfig().getTask();
log.debug("ConstantPredictor.queryPrediction({})", taskName);

if (model.containsKey(taskName)) {
return model.get(taskName);
} else {
return null;
}
}
}
Loading

0 comments on commit fb7ebac

Please sign in to comment.