Skip to content

Commit

Permalink
List task for hung workers
Browse files Browse the repository at this point in the history
If HungListener found some hung tests it will print information about:
- worker id
- test name
- test params
- last 15 lines from .result file

Closes #107
  • Loading branch information
Sergei Voronezhskii committed Aug 17, 2018
1 parent 822eed3 commit 974dbcc
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 12 deletions.
20 changes: 14 additions & 6 deletions lib/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pytap13
import pprint
import shutil
from functools import partial

try:
from cStringIO import StringIO
Expand Down Expand Up @@ -84,28 +85,35 @@ def flush(self):
self.stream.flush()


def get_filename_by_test(postfix, test_name):
rg = re.compile('\.test.*')
return os.path.basename(rg.sub(postfix, test_name))


get_reject = partial(get_filename_by_test, '.reject')
get_result = partial(get_filename_by_test, '.result')
get_skipcond = partial(get_filename_by_test, '.skipcond')


class Test:
"""An individual test file. A test object can run itself
and remembers completion state of the run.
If file <test_name>.skipcond is exists it will be executed before
test and if it sets self.skip to True value the test will be skipped.
"""
rg = re.compile('\.test.*')

def __init__(self, name, args, suite_ini, params={}, conf_name=None):
"""Initialize test properties: path to test file, path to
temporary result file, path to the client program, test status."""
self.name = name
self.args = args
self.suite_ini = suite_ini
self.result = os.path.join(suite_ini['suite'],
os.path.basename(self.rg.sub('.result', name)))
self.skip_cond = os.path.join(suite_ini['suite'],
os.path.basename(self.rg.sub('.skipcond', name)))
self.result = os.path.join(suite_ini['suite'], get_result(name))
self.skip_cond = os.path.join(suite_ini['suite'], get_skipcond(name))
self.tmp_result = os.path.join(self.suite_ini['vardir'],
os.path.basename(self.result))
self.reject = self.rg.sub('.reject', name)
self.reject = get_reject(name)
self.is_executed = False
self.is_executed_ok = None
self.is_equal_result = None
Expand Down
28 changes: 26 additions & 2 deletions lib/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import lib
from lib.utils import safe_makedirs
from lib.test_suite import TestSuite

from lib.test import get_result
from lib.colorer import color_stdout, color_log
from lib.tarantool_server import TarantoolServer

Expand Down Expand Up @@ -157,6 +157,19 @@ def __init__(self, worker_id, worker_name):
super(WorkerDone, self).__init__(worker_id, worker_name)


class WorkerCurrentTask(BaseWorkerMessage):
""" Provide information about current task running on worker.
It possible to check the `.result` file of hung tests.
And collect information about current tasks in parallel mode,
to show which parallel tests can affect failed test.
"""
def __init__(self, worker_id, worker_name,
task_name, task_param, task_result_filepath):
super(WorkerCurrentTask, self).__init__(worker_id, worker_name)
self.task_name = task_name
self.task_param = task_param
self.task_result_filepath = task_result_filepath

# Worker
########

Expand All @@ -176,6 +189,13 @@ def wrap_output(self, output, log_only):
def done_marker(self):
return WorkerDone(self.id, self.name)

def current_task(self, task_id):
task_name, task_param = task_id
task_result_filepath = os.path.join(self.suite.ini['vardir'],
get_result(task_name))
return WorkerCurrentTask(self.id, self.name,
task_name, task_param, task_result_filepath)

def wrap_result(self, task_id, short_status):
return WorkerTaskResult(self.id, self.name, task_id, short_status)

Expand Down Expand Up @@ -285,6 +305,8 @@ def run_loop(self, task_queue, result_queue):
schema='test_var')
self.stop_worker(task_queue, result_queue)
break

result_queue.put(self.current_task(task_id))
short_status = self.run_task(task_id)
result_queue.put(self.wrap_result(task_id, short_status))
if not lib.Options().args.is_force and short_status == 'fail':
Expand All @@ -307,7 +329,9 @@ def run_all(self, task_queue, result_queue):

try:
self.run_loop(task_queue, result_queue)
except (KeyboardInterrupt, Exception):
except (KeyboardInterrupt, Exception) as e:
if not isinstance(e, KeyboardInterrupt):
color_stdout('Exception: %s\n' % e, schema='error')
self.stop_worker(task_queue, result_queue, cleanup=False)

result_queue.put(self.done_marker())
Expand Down
35 changes: 31 additions & 4 deletions listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import lib
from lib.worker import get_reproduce_file
from lib.worker import WorkerOutput, WorkerDone, WorkerTaskResult
from lib.worker import WorkerOutput, WorkerDone, WorkerTaskResult, WorkerCurrentTask
from lib.colorer import color_stdout


Expand Down Expand Up @@ -178,28 +178,55 @@ def __init__(self, get_not_done_worker_ids, kill_all_workers, warn_timeout,
self.kill_timeout = kill_timeout
self.warned_seconds_ago = 0.0
self.inactivity = 0.0
self.worker_current_task = dict()

def process_result(self, obj):
self.warned_seconds_ago = 0.0
self.inactivity = 0.0

if isinstance(obj, WorkerCurrentTask):
self.worker_current_task[obj.worker_id] = obj

if isinstance(obj, WorkerDone):
if obj.worker_id in self.worker_current_task:
del self.worker_current_task[obj.worker_id]


def process_timeout(self, delta_seconds):
self.warned_seconds_ago += delta_seconds
self.inactivity += delta_seconds
worker_ids = self.get_not_done_worker_ids()

if self.warned_seconds_ago < self.warn_timeout:
return

color_stdout("No output during %d seconds. "
"List of workers not reporting the status: %s; "
"Will abort after %d seconds without output.\n" % (
self.inactivity, worker_ids, self.kill_timeout),
"Will abort after %d seconds without output. "
"List of workers not reporting the status:\n" % (
self.inactivity, self.kill_timeout),
schema='test_var')

for current_task in self.worker_current_task.itervalues():
color_stdout("[{0:03d}] {1} {2}\n".format(current_task.worker_id,
current_task.task_name,
current_task.task_param),
schema='test_var')
color_stdout("Last 15 lines of result file "
"[{0}]\n".format(current_task.task_result_filepath),
schema='error')
lib.utils.print_tail_n(current_task.task_result_filepath,
num_lines=15)


self.warned_seconds_ago = 0.0

if self.inactivity < self.kill_timeout:
return

color_stdout('\n[Main process] No output from workers. '
'It seems that we hang. Send SIGKILL to workers; '
'exiting...\n',
schema='test_var')
self.kill_all_workers()

raise HangError()

0 comments on commit 974dbcc

Please sign in to comment.