Skip to content

Commit

Permalink
List task for hung workers
Browse files Browse the repository at this point in the history
If HungListener found some hung tests it will print information about:
- worker id
- test name
- test params
- output from .result file

Part of #107
  • Loading branch information
Sergei Voronezhskii committed Aug 10, 2018
1 parent ed45e1d commit 0483493
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 11 deletions.
26 changes: 22 additions & 4 deletions dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,14 @@ def __init__(self, task_groups, max_workers_cnt, randomize):
self.worker_next_id = 1

tasks_cnt = 0
self.current_task_queue = SimpleQueue()
self.task_queue_disps = dict()
for key, task_group in task_groups.items():
tasks_cnt += len(task_group['task_ids'])
task_queue_disp = TaskQueueDispatcher(key, task_group, randomize)
task_queue_disp = TaskQueueDispatcher(key,
task_group,
randomize,
self.current_task_queue)
self.task_queue_disps[key] = task_queue_disp
self.result_queues.append(task_queue_disp.result_queue)
self.task_queues.append(task_queue_disp.task_queue)
Expand All @@ -97,6 +101,7 @@ def __init__(self, task_groups, max_workers_cnt, randomize):

self.pid_to_worker_id = dict()
self.worker_id_to_pid = dict()
self.worker_id_to_task = dict()

self.randomize = randomize
self.tcp_port_dispatcher = TcpPortDispatcher()
Expand Down Expand Up @@ -137,9 +142,19 @@ def init_listeners(self):
no_output_timeout = float(args.no_output_timeout or 120)
hang_watcher = listeners.HangWatcher(
output_watcher.not_done_worker_ids, self.kill_all_workers,
warn_timeout, no_output_timeout)
warn_timeout, no_output_timeout,
self.get_task_by_worker_id, self.set_task_for_worker_id
)
self.listeners.append(hang_watcher)

def set_task_for_worker_id(self):
while not self.current_task_queue.empty():
worker_id, task_id = self.current_task_queue.get()
self.worker_id_to_task[worker_id] = task_id

def get_task_by_worker_id(self, worker_id):
return self.worker_id_to_task[worker_id]

def run_max_workers(self):
ok = True
new_workers_cnt = self.max_workers_cnt - self.workers_cnt
Expand Down Expand Up @@ -340,7 +355,7 @@ class TaskQueueDispatcher:
"""Incapsulate data structures necessary for dispatching workers working on
the one task queue.
"""
def __init__(self, key, task_group, randomize):
def __init__(self, key, task_group, randomize, current_task_queue):
self.key = key
self.gen_worker = task_group['gen_worker']
self.task_ids = task_group['task_ids']
Expand All @@ -353,6 +368,7 @@ def __init__(self, key, task_group, randomize):
self.randomize = False
self.result_queue = SimpleQueue()
self.task_queue = SimpleQueue()
self.current_task_queue = current_task_queue
for task_id in self.task_ids:
self.task_queue.put(task_id)
self.worker_ids = set()
Expand All @@ -366,7 +382,9 @@ def _run_worker(self, worker_id, tcp_port_range):
os.environ['TEST_RUN_TCP_PORT_END'] = str(tcp_port_range[1])
color_stdout.queue = self.result_queue
worker = self.gen_worker(worker_id)
worker.run_all(self.task_queue, self.result_queue)
worker.run_all(self.task_queue,
self.result_queue,
self.current_task_queue)

def add_worker(self, worker_id, tcp_port_range):
# Note: each of our workers should consume only one None, but for the
Expand Down
9 changes: 6 additions & 3 deletions lib/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ def run_task(self, task_id):
raise
return short_status

def run_loop(self, task_queue, result_queue):
# Note: it's not exception safe
def run_loop(self, task_queue, result_queue, current_task_queue):
""" called from 'run_all' """
while True:
task_id = self.task_get(task_queue)
Expand All @@ -285,6 +286,8 @@ def run_loop(self, task_queue, result_queue):
schema='test_var')
self.stop_worker(task_queue, result_queue)
break

current_task_queue.put((self.id, task_id))
short_status = self.run_task(task_id)
result_queue.put(self.wrap_result(task_id, short_status))
if not lib.Options().args.is_force and short_status == 'fail':
Expand All @@ -299,14 +302,14 @@ def run_loop(self, task_queue, result_queue):
raise VoluntaryStopException()
self.task_done(task_queue)

def run_all(self, task_queue, result_queue):
def run_all(self, task_queue, result_queue, current_task_queue):
if not self.initialized:
self.flush_all_tasks(task_queue, result_queue)
result_queue.put(self.done_marker())
return

try:
self.run_loop(task_queue, result_queue)
self.run_loop(task_queue, result_queue, current_task_queue)
except (KeyboardInterrupt, Exception):
self.stop_worker(task_queue, result_queue, cleanup=False)

Expand Down
28 changes: 24 additions & 4 deletions listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,13 @@ class HangError(Exception):

class HangWatcher(BaseWatcher):
"""Terminate all workers if no output received 'no_output_times' time."""
rg = re.compile('\.test.*$')

def __init__(self, get_not_done_worker_ids, kill_all_workers, warn_timeout,
kill_timeout):
kill_timeout, get_task_by_worker_id, set_task_for_worker_id):
self.get_not_done_worker_ids = get_not_done_worker_ids
self.get_task_by_worker_id = get_task_by_worker_id
self.set_task_for_worker_id = set_task_for_worker_id
self.kill_all_workers = kill_all_workers
self.warn_timeout = warn_timeout
self.kill_timeout = kill_timeout
Expand All @@ -186,14 +190,30 @@ def process_result(self, obj):
def process_timeout(self, delta_seconds):
self.warned_seconds_ago += delta_seconds
self.inactivity += delta_seconds
self.set_task_for_worker_id()
worker_ids = self.get_not_done_worker_ids()
if self.warned_seconds_ago < self.warn_timeout:
return
color_stdout("No output during %d seconds. "
"List of workers not reporting the status: %s; "
"Will abort after %d seconds without output.\n" % (
self.inactivity, worker_ids, self.kill_timeout),
"Will abort after %d seconds without output. "
"List of workers not reporting the status:\n" % (
self.inactivity, self.kill_timeout),
schema='test_var')
for worker_id in worker_ids:
task_name, task_param = self.get_task_by_worker_id(worker_id)
color_stdout("[{0:03d}] {1} {2}\n".format(worker_id,
task_name,
task_param or ''),
schema='test_var')
task_path = "{0:03d}_{1}".format(worker_id,
self.rg.sub('.result', task_name))
main_vardir = os.path.realpath(lib.Options().args.vardir)
rf = os.path.join(main_vardir, task_path)
if os.path.exists(rf):
color_stdout("Last 15 lines of result file [{0}]\n".format(rf),
schema='error')
lib.utils.print_tail_n(rf, num_lines=15)

self.warned_seconds_ago = 0.0
if self.inactivity < self.kill_timeout:
return
Expand Down

0 comments on commit 0483493

Please sign in to comment.