-
Notifications
You must be signed in to change notification settings - Fork 314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Run a task completely even without time-periods #763
Merged
danielmitterdorfer
merged 2 commits into
elastic:master
from
danielmitterdorfer:task-completion-via-param-source
Sep 10, 2019
Merged
Changes from 1 commit
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1260,105 +1260,159 @@ def schedule_for(current_track, task, client_index): | |
runner_for_op = runner.runner_for(op.type) | ||
params_for_op = track.operation_parameters(current_track, op).partition(client_index, num_clients) | ||
|
||
if task.warmup_time_period is not None or task.time_period is not None: | ||
if requires_time_period_schedule(task, params_for_op): | ||
warmup_time_period = task.warmup_time_period if task.warmup_time_period else 0 | ||
logger.info("Creating time-period based schedule with [%s] distribution for [%s] with a warmup period of [%s] seconds and a " | ||
"time period of [%s] seconds.", task.schedule, task, str(warmup_time_period), str(task.time_period)) | ||
return time_period_based(sched, warmup_time_period, task.time_period, runner_for_op, params_for_op) | ||
logger.info("Creating time-period based schedule with [%s] distribution for [%s] with a warmup period of [%s] " | ||
"seconds and a time period of [%s] seconds.", task.schedule, task.name, | ||
str(warmup_time_period), str(task.time_period)) | ||
loop_control = TimePeriodBased(warmup_time_period, task.time_period) | ||
else: | ||
warmup_iterations = task.warmup_iterations if task.warmup_iterations else 0 | ||
if task.iterations: | ||
iterations = task.iterations | ||
elif params_for_op.size(): | ||
iterations = params_for_op.size() - warmup_iterations | ||
else: | ||
elif params_for_op.infinite: | ||
# this is usually the case if the parameter source provides a constant | ||
iterations = 1 | ||
logger.info("Creating iteration-count based schedule with [%s] distribution for [%s] with [%d] warmup iterations and " | ||
"[%d] iterations." % (task.schedule, op, warmup_iterations, iterations)) | ||
return iteration_count_based(sched, warmup_iterations, iterations, runner_for_op, params_for_op) | ||
else: | ||
iterations = None | ||
logger.info("Creating iteration-count based schedule with [%s] distribution for [%s] with [%s] warmup " | ||
"iterations and [%s] iterations.", task.schedule, task.name, str(warmup_iterations), str(iterations)) | ||
loop_control = IterationBased(warmup_iterations, iterations) | ||
|
||
return generator_for_schedule(task.name, sched, loop_control, runner_for_op, params_for_op) | ||
|
||
|
||
def requires_time_period_schedule(task, params): | ||
if task.warmup_time_period is not None or task.time_period is not None: | ||
return True | ||
# user has explicitly requested iterations | ||
if task.warmup_iterations is not None or task.iterations is not None: | ||
return False | ||
# If the parameter source ends after a finite amount of iterations, we will run with a time-based schedule | ||
return not params.infinite | ||
|
||
|
||
def time_period_based(sched, warmup_time_period, time_period, runner, params): | ||
def generator_for_schedule(task_name, sched, task_progress_control, runner, params): | ||
""" | ||
Calculates the necessary schedule for time period based operations. | ||
Creates a generator that will yield individual task invocations for the provided schedule. | ||
|
||
:param sched: The scheduler for this task. Must not be None. | ||
:param warmup_time_period: The time period in seconds that is considered for warmup. Must not be None; provide zero instead. | ||
:param time_period: The time period in seconds that is considered for measurement. May be None. | ||
:param task_name: The name of the task for which the schedule is generated. | ||
:param sched: The scheduler for this task. | ||
:param task_progress_control: Controls how and how often this generator will loop. | ||
:param runner: The runner for a given operation. | ||
:param params: The parameter source for a given operation. | ||
:return: A generator for the corresponding parameters. | ||
""" | ||
next_scheduled = 0 | ||
start = time.perf_counter() | ||
logger = logging.getLogger(__name__) | ||
if time_period is None: | ||
iterations = params.size() | ||
if iterations: | ||
logger.info("No time-period property specified. Will run for at most [%d] iterations.", iterations) | ||
for it in range(0, iterations): | ||
try: | ||
sample_type = metrics.SampleType.Warmup if time.perf_counter() - start < warmup_time_period else metrics.SampleType.Normal | ||
percent_completed = (it + 1) / iterations | ||
yield (next_scheduled, sample_type, percent_completed, runner, params.params()) | ||
next_scheduled = sched.next(next_scheduled) | ||
except StopIteration: | ||
logger.info("Time-period-based schedule stopped due to StopIteration.") | ||
return | ||
logger.info("Time-period-based schedule stopped after the specified number of [%d] iterations.", iterations) | ||
else: | ||
logger.info("No time-period property specified. Will run as long as the parameter source provides values.") | ||
param_source_knows_progress = hasattr(params, "percent_completed") | ||
while True: | ||
try: | ||
sample_type = metrics.SampleType.Warmup if time.perf_counter() - start < warmup_time_period else metrics.SampleType.Normal | ||
# does not contribute at all to completion. Hence, we cannot define completion. | ||
percent_completed = params.percent_completed if param_source_knows_progress else None | ||
yield (next_scheduled, sample_type, percent_completed, runner, params.params()) | ||
next_scheduled = sched.next(next_scheduled) | ||
except StopIteration: | ||
logger.info("Time-period-based schedule stopped due to StopIteration.") | ||
return | ||
if task_progress_control.infinite: | ||
logger.info("Parameter source will determine when the schedule for [%s] terminates.", task_name) | ||
param_source_knows_progress = hasattr(params, "percent_completed") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I love the name of this variable 👍 |
||
task_progress_control.start() | ||
while True: | ||
try: | ||
# does not contribute at all to completion. Hence, we cannot define completion. | ||
percent_completed = params.percent_completed if param_source_knows_progress else None | ||
yield (next_scheduled, task_progress_control.sample_type, percent_completed, runner, params.params()) | ||
next_scheduled = sched.next(next_scheduled) | ||
task_progress_control.next() | ||
except StopIteration: | ||
logger.info("%s schedule for [%s] stopped due to StopIteration.", str(task_progress_control), task_name) | ||
return | ||
else: | ||
duration = warmup_time_period + time_period | ||
end = start + duration | ||
logger.info("Time-period-based schedule will run for a total of [%d] seconds.", duration) | ||
while time.perf_counter() < end: | ||
task_progress_control.start() | ||
logger.info("%s schedule will determine when the schedule for [%s] terminates.", | ||
str(task_progress_control), task_name) | ||
while not task_progress_control.completed: | ||
try: | ||
elapsed = time.perf_counter() - start | ||
sample_type = metrics.SampleType.Warmup if elapsed < warmup_time_period else metrics.SampleType.Normal | ||
percent_completed = elapsed / duration | ||
yield (next_scheduled, sample_type, percent_completed, runner, params.params()) | ||
yield (next_scheduled, | ||
task_progress_control.sample_type, | ||
task_progress_control.percent_completed, | ||
runner, | ||
params.params()) | ||
next_scheduled = sched.next(next_scheduled) | ||
task_progress_control.next() | ||
except StopIteration: | ||
logger.info("%s schedule for [%s] stopped due to StopIteration.", str(task_progress_control), task_name) | ||
return | ||
logger.info("Time-period-based schedule stopped after the specified time period of [%d] seconds.", duration) | ||
logger.info("%s schedule for [%s] stopped regularly.", str(task_progress_control), task_name) | ||
|
||
|
||
def iteration_count_based(sched, warmup_iterations, iterations, runner, params): | ||
""" | ||
Calculates the necessary schedule based on a given number of iterations. | ||
class TimePeriodBased: | ||
def __init__(self, warmup_time_period, time_period): | ||
self._warmup_time_period = warmup_time_period | ||
self._time_period = time_period | ||
if warmup_time_period is not None and time_period is not None: | ||
self._duration = self._warmup_time_period + self._time_period | ||
else: | ||
self._duration = None | ||
self._start = None | ||
self._now = None | ||
|
||
:param sched: The scheduler for this task. Must not be None. | ||
:param warmup_iterations: The number of warmup iterations to run. 0 if no warmup should be performed. | ||
:param iterations: The number of measurement iterations to run. | ||
:param runner: The runner for a given operation. | ||
:param params: The parameter source for a given operation. | ||
:return: A generator for the corresponding parameters. | ||
""" | ||
next_scheduled = 0 | ||
total_iterations = warmup_iterations + iterations | ||
logger = logging.getLogger(__name__) | ||
if total_iterations == 0: | ||
raise exceptions.RallyAssertionError("Operation must run at least for one iteration.") | ||
for it in range(0, total_iterations): | ||
try: | ||
sample_type = metrics.SampleType.Warmup if it < warmup_iterations else metrics.SampleType.Normal | ||
percent_completed = (it + 1) / total_iterations | ||
yield (next_scheduled, sample_type, percent_completed, runner, params.params()) | ||
next_scheduled = sched.next(next_scheduled) | ||
except StopIteration: | ||
logger.info("Iteration-count-based schedule stopped due to StopIteration.") | ||
return | ||
logger.info("Iteration-count-based schedule stopped after [%d] warmup iterations and [%d] iterations.", | ||
warmup_iterations, iterations) | ||
def start(self): | ||
self._now = time.perf_counter() | ||
self._start = self._now | ||
|
||
@property | ||
def _elapsed(self): | ||
return self._now - self._start | ||
|
||
@property | ||
def sample_type(self): | ||
return metrics.SampleType.Warmup if self._elapsed < self._warmup_time_period else metrics.SampleType.Normal | ||
|
||
@property | ||
def infinite(self): | ||
return self._time_period is None | ||
|
||
@property | ||
def percent_completed(self): | ||
return self._elapsed / self._duration | ||
|
||
@property | ||
def completed(self): | ||
return self._now >= (self._start + self._duration) | ||
|
||
def next(self): | ||
self._now = time.perf_counter() | ||
|
||
def __str__(self): | ||
return "time-period-based" | ||
|
||
|
||
class IterationBased: | ||
def __init__(self, warmup_iterations, iterations): | ||
self._warmup_iterations = warmup_iterations | ||
self._iterations = iterations | ||
if warmup_iterations is not None and iterations is not None: | ||
self._total_iterations = self._warmup_iterations + self._iterations | ||
if self._total_iterations == 0: | ||
raise exceptions.RallyAssertionError("Operation must run at least for one iteration.") | ||
else: | ||
self._total_iterations = None | ||
self._it = None | ||
|
||
def start(self): | ||
self._it = 0 | ||
|
||
@property | ||
def sample_type(self): | ||
return metrics.SampleType.Warmup if self._it < self._warmup_iterations else metrics.SampleType.Normal | ||
|
||
@property | ||
def infinite(self): | ||
return self._iterations is None | ||
|
||
@property | ||
def percent_completed(self): | ||
return (self._it + 1) / self._total_iterations | ||
|
||
@property | ||
def completed(self): | ||
return self._it >= self._total_iterations | ||
|
||
def next(self): | ||
self._it += 1 | ||
|
||
def __str__(self): | ||
return "iteration-count-based" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are missing the crucial information here about how the custom parameter source can trigger a task stop by raising a
StopIteration
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I've addressed this in 06add96.