Skip to content

Commit

Permalink
Improve feedback on errors
Browse files Browse the repository at this point in the history
In case an exception occurs in the load test driver, we
now communicate these problems to the control process so
Rally can show a better feedback message.

Closes #153
  • Loading branch information
danielmitterdorfer committed Oct 6, 2016
1 parent 5a747bc commit 1ec82a5
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 56 deletions.
110 changes: 69 additions & 41 deletions esrally/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ def __init__(self, metrics):
self.metrics = metrics


# Workaround for https://github.com/godaddy/Thespian/issues/22
class BenchmarkFailure:
"""
Indicates a failure in the benchmark execution due to an exception
"""
def __init__(self, message, cause):
self.message = message
self.cause = cause


class Driver(thespian.actors.Actor):
WAKEUP_INTERVAL_SECONDS = 1
"""
Expand Down Expand Up @@ -121,16 +131,31 @@ def __init__(self):
self.most_recent_sample_per_client = {}

def receiveMessage(self, msg, sender):
if isinstance(msg, StartBenchmark):
self.start_benchmark(msg, sender)
elif isinstance(msg, JoinPointReached):
self.joinpoint_reached(msg)
elif isinstance(msg, UpdateSamples):
self.update_samples(msg)
elif isinstance(msg, thespian.actors.WakeupMessage):
if not self.finished():
self.update_progress_message()
self.wakeupAfter(datetime.timedelta(seconds=Driver.WAKEUP_INTERVAL_SECONDS))
try:
if isinstance(msg, StartBenchmark):
self.start_benchmark(msg, sender)
elif isinstance(msg, JoinPointReached):
self.joinpoint_reached(msg)
elif isinstance(msg, UpdateSamples):
self.update_samples(msg)
elif isinstance(msg, thespian.actors.WakeupMessage):
if not self.finished():
self.update_progress_message()
self.wakeupAfter(datetime.timedelta(seconds=Driver.WAKEUP_INTERVAL_SECONDS))
elif isinstance(msg, BenchmarkFailure):
logger.error("Main driver received a fatal exception from a load generator. Shutting down.")
self.metrics_store.close()
for driver in self.drivers:
self.send(driver, thespian.actors.ActorExitRequest())
self.send(self.start_sender, msg)
self.send(self.myAddress, thespian.actors.ActorExitRequest())
except Exception as e:
logger.exception("Main driver encountered a fatal exception. Shutting down.")
self.metrics_store.close()
for driver in self.drivers:
self.send(driver, thespian.actors.ActorExitRequest())
self.send(self.start_sender, BenchmarkFailure("Could not execute benchmark", e))
self.send(self.myAddress, thespian.actors.ActorExitRequest())

def start_benchmark(self, msg, sender):
logger.info("Benchmark is about to start.")
Expand Down Expand Up @@ -274,39 +299,42 @@ def __init__(self):
self.start_driving = False

def receiveMessage(self, msg, sender):
if isinstance(msg, StartLoadGenerator):
logger.debug("client [%d] is about to start." % msg.client_id)
self.master = sender
self.client_id = msg.client_id
self.es = client.EsClientFactory(msg.config.opts("client", "hosts"), msg.config.opts("client", "options")).create()
self.config = msg.config
self.indices = msg.indices
self.tasks = msg.tasks
self.current_task = 0
self.start_timestamp = time.perf_counter()
self.drive()
elif isinstance(msg, Drive):
logger.debug("Client [%d] is continuing its work at task index [%d] on [%f]." %
(self.client_id, self.current_task, msg.client_start_timestamp))
self.master = sender
self.start_driving = True
self.wakeupAfter(datetime.timedelta(seconds=time.perf_counter() - msg.client_start_timestamp))
elif isinstance(msg, thespian.actors.WakeupMessage):
logger.debug("client [%d] woke up." % self.client_id)
# it would be better if we could send ourselves a message at a specific time, simulate this with a boolean...
if self.start_driving:
self.start_driving = False
try:
if isinstance(msg, StartLoadGenerator):
logger.debug("client [%d] is about to start." % msg.client_id)
self.master = sender
self.client_id = msg.client_id
self.es = client.EsClientFactory(msg.config.opts("client", "hosts"), msg.config.opts("client", "options")).create()
self.config = msg.config
self.indices = msg.indices
self.tasks = msg.tasks
self.current_task = 0
self.start_timestamp = time.perf_counter()
self.drive()
elif isinstance(msg, Drive):
logger.debug("Client [%d] is continuing its work at task index [%d] on [%f]." %
(self.client_id, self.current_task, msg.client_start_timestamp))
self.master = sender
self.start_driving = True
self.wakeupAfter(datetime.timedelta(seconds=time.perf_counter() - msg.client_start_timestamp))
elif isinstance(msg, thespian.actors.WakeupMessage):
logger.debug("client [%d] woke up." % self.client_id)
# it would be better if we could send ourselves a message at a specific time, simulate this with a boolean...
if self.start_driving:
self.start_driving = False
self.drive()
else:
self.send_samples()
if self.executor_future is not None:
if self.executor_future.done():
self.executor_future = None
self.drive()
else:
self.wakeupAfter(datetime.timedelta(seconds=LoadGenerator.WAKEUP_INTERVAL_SECONDS))
else:
self.send_samples()
if self.executor_future is not None:
if self.executor_future.done():
self.executor_future = None
self.drive()
else:
self.wakeupAfter(datetime.timedelta(seconds=LoadGenerator.WAKEUP_INTERVAL_SECONDS))
else:
logger.debug("client [%d] received unknown message [%s] (ignoring)." % (self.client_id, str(msg)))
logger.debug("client [%d] received unknown message [%s] (ignoring)." % (self.client_id, str(msg)))
except Exception as e:
self.send(self.master, BenchmarkFailure("Fatal error in load generator [%d]" % self.client_id, e))

def drive(self):
task = None
Expand Down
35 changes: 20 additions & 15 deletions esrally/racecontrol.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,17 @@ def benchmark_internal(ctx):
metrics_store = ctx.mechanic._metrics_store

cluster.on_benchmark_start()
completed = actors.ask(main_driver, driver.StartBenchmark(ctx.config, ctx.track, metrics_store.meta_info))
cluster.on_benchmark_stop()
if not hasattr(completed, "metrics"):
raise exceptions.RallyError("Driver has returned no metrics but instead [%s]. Terminating race without result." % str(completed))
metrics_store.bulk_add(completed.metrics)

ctx.mechanic.stop_engine(cluster)
ctx.mechanic.revise_candidate()
ctx.mechanic.stop_metrics()
result = actors.ask(main_driver, driver.StartBenchmark(ctx.config, ctx.track, metrics_store.meta_info))
if isinstance(result, driver.BenchmarkComplete):
cluster.on_benchmark_stop()
metrics_store.bulk_add(result.metrics)
ctx.mechanic.stop_engine(cluster)
ctx.mechanic.revise_candidate()
ctx.mechanic.stop_metrics()
elif isinstance(result, driver.BenchmarkFailure):
raise exceptions.RallyError(result.message, result.cause)
else:
raise exceptions.RallyError("Driver has returned no metrics but instead [%s]. Terminating race without result." % str(result))


# TODO dm module refactoring: mechanic
Expand Down Expand Up @@ -180,12 +182,15 @@ def benchmark_external(ctx):
metrics_store = ctx.mechanic._metrics_store

ctx.cluster.on_benchmark_start()
completed = actors.ask(main_driver, driver.StartBenchmark(ctx.config, ctx.track, metrics_store.meta_info))
ctx.cluster.on_benchmark_stop()
if not hasattr(completed, "metrics"):
raise exceptions.RallyError("Driver has returned no metrics but instead [%s]. Terminating race without result." % str(completed))
metrics_store.bulk_add(completed.metrics)
ctx.mechanic.stop_metrics()
result = actors.ask(main_driver, driver.StartBenchmark(ctx.config, ctx.track, metrics_store.meta_info))
if isinstance(result, driver.BenchmarkComplete):
ctx.cluster.on_benchmark_stop()
metrics_store.bulk_add(result.metrics)
ctx.mechanic.stop_metrics()
elif isinstance(result, driver.BenchmarkFailure):
raise exceptions.RallyError(result.message, result.cause)
else:
raise exceptions.RallyError("Driver has returned no metrics but instead [%s]. Terminating race without result." % str(result))


# TODO dm module refactoring: mechanic
Expand Down
2 changes: 2 additions & 0 deletions esrally/rally.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,10 @@ def main():
times_interrupted = 0
while not shutdown_complete and times_interrupted < 2:
try:
logger.info("Attempting to shutdown internal actor system.")
actors.shutdown()
shutdown_complete = True
logger.info("Shutdown completed.")
except KeyboardInterrupt:
times_interrupted += 1
logger.warn("User interrupted shutdown of internal actor system.")
Expand Down

0 comments on commit 1ec82a5

Please sign in to comment.