-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Memory prediction and task scaling #10
Changes from 85 commits
60a58ee
dda273a
a846aa2
a87afc1
ffcddd4
4b7640d
8eadf40
537df5d
82aaf7f
0826b91
440add6
cd5a66a
43f07e7
445a84b
02fee25
8755f00
fde2665
daecba1
01185df
827e2b4
cbcee82
66ab197
9f8b576
61bd311
14ef86c
4098e74
a081142
dceab80
f705f61
c126281
2e8d14f
d9e578b
b738ac4
9b61cfc
10ceba7
47222c1
b503166
c3cc4f2
f61f99c
a732d2f
6bc2bc0
a38cadc
2e860d5
61e9f96
1ec1417
db4e822
2ede083
2bd9a00
160dd71
1da54a8
5b21105
0ceb17a
ed71e1d
a352a4b
838faee
5f6d2a1
1864d9d
9373302
b07c526
6e5a35a
c8caf00
c6ab571
3896718
af35f16
9a99b48
ae72d0c
bd1a1bc
66730f9
d7f093e
54e907e
c3c2ebb
496b82a
8478ee4
cd70d2d
6f01832
c878d7f
a74c20e
e32c74a
98b64c2
34117eb
a96a151
8f4ca6f
0fb4ea2
d513125
1791a1a
aa6f7ea
18fa249
f698ce6
e34a4e5
5c67ec1
e6e3f63
9a1064f
bdf74ce
fadd42d
6bd513e
89f1360
647da6d
49bebe7
7ba49db
fc76c77
5f6fb5e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
/* | ||
* Copyright (c) 2023, Florian Friederici. All rights reserved. | ||
* | ||
* This code is free software: you can redistribute it and/or modify it under | ||
* the terms of the GNU General Public License as published by the Free | ||
* Software Foundation, either version 3 of the License, or (at your option) | ||
* any later version. | ||
* | ||
* This code is distributed in the hope that it will be useful, but WITHOUT ANY | ||
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | ||
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more | ||
* details. | ||
* | ||
* You should have received a copy of the GNU General Public License along with | ||
* this work. If not, see <https://www.gnu.org/licenses/>. | ||
*/ | ||
|
||
package cws.k8s.scheduler.memory; | ||
|
||
import java.math.BigDecimal; | ||
|
||
import cws.k8s.scheduler.model.Task; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
//@formatter:off | ||
/** | ||
* CombiPredictor will combine predictions made by ConstantPredictor and | ||
* LineraPredictor. | ||
* | ||
* LinearPredictor fails if there are no inputSize differences to tasks, | ||
* ConstantPredictor can handle this case. So CombiPredictor will run both and | ||
* decide dynamically which predictions to apply. | ||
* | ||
* @author Florian Friederici | ||
* | ||
*/ | ||
//@formatter:on | ||
@Slf4j | ||
public class CombiPredictor implements MemoryPredictor { | ||
|
||
ConstantPredictor constantPredictor; | ||
LinearPredictor linearPredictor; | ||
|
||
public CombiPredictor() { | ||
this.constantPredictor = new ConstantPredictor(); | ||
this.linearPredictor = new LinearPredictor(); | ||
} | ||
|
||
@Override | ||
public void addObservation(Observation o) { | ||
log.debug("CombiPredictor.addObservation({})", o); | ||
constantPredictor.addObservation(o); | ||
linearPredictor.addObservation(o); | ||
} | ||
|
||
@Override | ||
public String queryPrediction(Task task) { | ||
String taskName = task.getConfig().getTask(); | ||
log.debug("CombiPredictor.queryPrediction({},{})", taskName, task.getInputSize()); | ||
|
||
String constantPrediction = constantPredictor.queryPrediction(task); | ||
String linearPrediction = linearPredictor.queryPrediction(task); | ||
|
||
if (constantPrediction==null && linearPrediction==null) { | ||
// no prediction available at all | ||
return null; | ||
} | ||
|
||
if (constantPrediction!=null && linearPrediction==null) { | ||
// only the constantPrediction is available | ||
return constantPrediction; | ||
} | ||
|
||
if (constantPrediction==null && linearPrediction!=null) { | ||
// only the linearPrediction is available (unusual case) | ||
return linearPrediction; | ||
} | ||
|
||
log.debug("constantPrediction={}, linearPrediction={}, difference={}", constantPrediction, linearPrediction, new BigDecimal(constantPrediction).subtract(new BigDecimal(linearPrediction))); | ||
|
||
// prefer linearPrediction if both would be available | ||
return linearPrediction; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
/* | ||
* Copyright (c) 2023, Florian Friederici. All rights reserved. | ||
* | ||
* This code is free software: you can redistribute it and/or modify it under | ||
* the terms of the GNU General Public License as published by the Free | ||
* Software Foundation, either version 3 of the License, or (at your option) | ||
* any later version. | ||
* | ||
* This code is distributed in the hope that it will be useful, but WITHOUT ANY | ||
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | ||
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more | ||
* details. | ||
* | ||
* You should have received a copy of the GNU General Public License along with | ||
* this work. If not, see <https://www.gnu.org/licenses/>. | ||
*/ | ||
|
||
package cws.k8s.scheduler.memory; | ||
|
||
import java.math.BigDecimal; | ||
import java.math.RoundingMode; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
import cws.k8s.scheduler.model.Task; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
// @formatter:off | ||
/** | ||
* ConstantPredictor will use the following strategy: | ||
* | ||
* - In case task was successful: | ||
* - let the next prediction be 10% higher, then the peakRss was | ||
* | ||
* - In case task has failed: | ||
* - reset to initial value | ||
* | ||
* I.e. the suggestions from ConstantPredictor are not dependent on the input | ||
* size of the tasks. | ||
* | ||
* @author Florian Friederici | ||
* | ||
*/ | ||
// @formatter:on | ||
@Slf4j | ||
class ConstantPredictor implements MemoryPredictor { | ||
|
||
Map<String, BigDecimal> model; | ||
Map<String, BigDecimal> initialValue; | ||
|
||
public ConstantPredictor() { | ||
model = new HashMap<>(); | ||
initialValue = new HashMap<>(); | ||
} | ||
|
||
@Override | ||
public void addObservation(Observation o) { | ||
log.debug("ConstantPredictor.addObservation({})", o); | ||
if (!TaskScaler.checkObservationSanity(o)) { | ||
log.warn("dismiss observation {}", o); | ||
return; | ||
} | ||
|
||
// store initial ramRequest value per task | ||
if (!initialValue.containsKey(o.task)) { | ||
initialValue.put(o.task, o.getRamRequest()); | ||
} | ||
|
||
if (Boolean.TRUE.equals(o.success)) { | ||
// set model to peakRss + 10% | ||
if (model.containsKey(o.task)) { | ||
model.replace(o.task, o.peakRss.multiply(new BigDecimal("1.1")).setScale(0, RoundingMode.CEILING)); | ||
} else { | ||
model.put(o.task, o.peakRss.multiply(new BigDecimal("1.1")).setScale(0, RoundingMode.CEILING)); | ||
} | ||
} else { | ||
// reset to initialValue | ||
if (model.containsKey(o.task)) { | ||
model.replace(o.task, this.initialValue.get(o.task)); | ||
} else { | ||
model.put(o.task, o.ramRequest.multiply(new BigDecimal(2)).setScale(0, RoundingMode.CEILING)); | ||
} | ||
} | ||
|
||
} | ||
|
||
@Override | ||
public String queryPrediction(Task task) { | ||
String taskName = task.getConfig().getTask(); | ||
log.debug("ConstantPredictor.queryPrediction({})", taskName); | ||
|
||
if (model.containsKey(taskName)) { | ||
return model.get(taskName).toPlainString(); | ||
} else { | ||
return null; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
/* | ||
* Copyright (c) 2023, Florian Friederici. All rights reserved. | ||
* | ||
* This code is free software: you can redistribute it and/or modify it under | ||
* the terms of the GNU General Public License as published by the Free | ||
* Software Foundation, either version 3 of the License, or (at your option) | ||
* any later version. | ||
* | ||
* This code is distributed in the hope that it will be useful, but WITHOUT ANY | ||
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | ||
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more | ||
* details. | ||
* | ||
* You should have received a copy of the GNU General Public License along with | ||
* this work. If not, see <https://www.gnu.org/licenses/>. | ||
*/ | ||
|
||
package cws.k8s.scheduler.memory; | ||
|
||
import java.math.BigDecimal; | ||
import java.math.RoundingMode; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
import org.apache.commons.math3.stat.regression.SimpleRegression; | ||
|
||
import cws.k8s.scheduler.model.Task; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
//@formatter:off | ||
/** | ||
* LinearPredictor will use the following strategy: | ||
* | ||
* If there are less than 2 observations, give no prediction, else: | ||
* Calculate linear regression model and provide predictions. | ||
* | ||
* Predictions start with 10% over-provisioning. If tasks fail, this will | ||
* increase automatically. | ||
* | ||
* @author Florian Friederici | ||
* | ||
*/ | ||
//@formatter:on | ||
@Slf4j | ||
public class LinearPredictor implements MemoryPredictor { | ||
|
||
Map<String, SimpleRegression> model; | ||
Map<String, Double> overprovisioning; | ||
|
||
public LinearPredictor() { | ||
model = new HashMap<>(); | ||
overprovisioning = new HashMap<>(); | ||
} | ||
|
||
@Override | ||
public void addObservation(Observation o) { | ||
log.debug("LinearPredictor.addObservation({})", o); | ||
if (!TaskScaler.checkObservationSanity(o)) { | ||
log.warn("dismiss observation {}", o); | ||
return; | ||
} | ||
|
||
if (!overprovisioning.containsKey(o.task)) { | ||
overprovisioning.put(o.task, 1.1); | ||
} | ||
|
||
if (Boolean.TRUE.equals(o.success)) { | ||
if (!model.containsKey(o.task)) { | ||
model.put(o.task, new SimpleRegression()); | ||
} | ||
|
||
double x = o.getInputSize(); | ||
double y = o.getPeakRss().doubleValue(); | ||
model.get(o.task).addData(x,y); | ||
} else { | ||
log.debug("overprovisioning value will increase due to task failure"); | ||
Double old = overprovisioning.get(o.task); | ||
overprovisioning.put(o.task, old+0.05); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume this will increase at the beginning, where we might make a few wrong predictions. However, the overprovisioning is never decreased if we have more observations and maybe better predictions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The overprovisioning was my first attempt in solving the problem of to low estimates. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about a strategy that checks the predictions for any task in the training data and determines the highest offset needed to fit all/95%/99% values? The percent value could be a user defined hyper parameter. |
||
} | ||
} | ||
|
||
@Override | ||
public String queryPrediction(Task task) { | ||
String taskName = task.getConfig().getTask(); | ||
log.debug("LinearPredictor.queryPrediction({},{})", taskName, task.getInputSize()); | ||
|
||
if (!model.containsKey(taskName)) { | ||
log.debug("LinearPredictor has no model for {}", taskName); | ||
return null; | ||
} | ||
|
||
SimpleRegression simpleRegression = model.get(taskName); | ||
double prediction = simpleRegression.predict(task.getInputSize()); | ||
|
||
if (Double.isNaN(prediction)) { | ||
log.debug("No prediction possible for {}", taskName); | ||
return null; | ||
} | ||
|
||
if (prediction < 0) { | ||
log.warn("prediction would be negative: {}", prediction); | ||
return null; | ||
} | ||
|
||
return BigDecimal.valueOf(prediction).multiply(BigDecimal.valueOf(overprovisioning.get(taskName))).setScale(0, RoundingMode.CEILING).toPlainString(); | ||
Lehmann-Fabian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* Copyright (c) 2023, Florian Friederici. All rights reserved. | ||
* | ||
* This code is free software: you can redistribute it and/or modify it under | ||
* the terms of the GNU General Public License as published by the Free | ||
* Software Foundation, either version 3 of the License, or (at your option) | ||
* any later version. | ||
* | ||
* This code is distributed in the hope that it will be useful, but WITHOUT ANY | ||
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | ||
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more | ||
* details. | ||
* | ||
* You should have received a copy of the GNU General Public License along with | ||
* this work. If not, see <https://www.gnu.org/licenses/>. | ||
*/ | ||
|
||
package cws.k8s.scheduler.memory; | ||
|
||
import cws.k8s.scheduler.model.Task; | ||
|
||
// @formatter:off | ||
/** | ||
* The MemoryPredictor has two important interfaces: | ||
* | ||
* 1) addObservation() | ||
* - "add a new observation" after a workflow task is finished, the | ||
* observation result will be collected in the MemoryPredictor | ||
* | ||
* 2) queryPrediction() | ||
* - "ask for a suggestion" at any time, the MemoryPredictor can be asked | ||
* what its guess is on the resource requirement of a task | ||
* | ||
* Different strategies can be tried and exchanged easily, they just have to | ||
* implement those two interfaces. See ConstantPredictor and LinearPredictor | ||
* for concrete strategies. | ||
* | ||
* @author Florian Friederici | ||
* | ||
*/ | ||
// @formatter:on | ||
interface MemoryPredictor { | ||
|
||
/** | ||
* input observation into the MemoryPredictor, to be used to learn memory usage | ||
* of tasks to create suggestions | ||
* | ||
* @param o the observation that was made | ||
*/ | ||
void addObservation(Observation o); | ||
|
||
/** | ||
* ask the MemoryPredictor for a suggestion on how much memory should be | ||
* assigned to the task. | ||
* | ||
* @param task the task to get a suggestion form | ||
* @return null, if no suggestion possible, otherwise the value to be used | ||
*/ | ||
String queryPrediction(Task task); | ||
Lehmann-Fabian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should not const always use the highest value ever seen? This would replace it with the most recent one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current behaviour is:
If the scheduler provides the tasks in order, with the tasks that has the biggest input first, this would cause the prediction to follow and always shrink.
But I agree that different "constant" strategies could be taken, e.g.:
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ordering it by size desc is only one of many possible scheduling strategies. Ordering could also be asc, random or FIFO. We should use maximum with x% offset.