Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

List tasks information for hung workers #110

Merged
merged 2 commits into from
Sep 4, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions lib/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pytap13
import pprint
import shutil
from functools import partial

try:
from cStringIO import StringIO
Expand Down Expand Up @@ -84,28 +85,35 @@ def flush(self):
self.stream.flush()


def get_filename_by_test(postfix, test_name):
rg = re.compile('\.test.*')
return os.path.basename(rg.sub(postfix, test_name))


get_reject = partial(get_filename_by_test, '.reject')
get_result = partial(get_filename_by_test, '.result')
get_skipcond = partial(get_filename_by_test, '.skipcond')


class Test:
"""An individual test file. A test object can run itself
and remembers completion state of the run.

If file <test_name>.skipcond is exists it will be executed before
test and if it sets self.skip to True value the test will be skipped.
"""
rg = re.compile('\.test.*')

def __init__(self, name, args, suite_ini, params={}, conf_name=None):
"""Initialize test properties: path to test file, path to
temporary result file, path to the client program, test status."""
self.name = name
self.args = args
self.suite_ini = suite_ini
self.result = os.path.join(suite_ini['suite'],
os.path.basename(self.rg.sub('.result', name)))
self.skip_cond = os.path.join(suite_ini['suite'],
os.path.basename(self.rg.sub('.skipcond', name)))
self.result = os.path.join(suite_ini['suite'], get_result(name))
self.skip_cond = os.path.join(suite_ini['suite'], get_skipcond(name))
self.tmp_result = os.path.join(self.suite_ini['vardir'],
os.path.basename(self.result))
self.reject = self.rg.sub('.reject', name)
self.reject = get_reject(name)
self.is_executed = False
self.is_executed_ok = None
self.is_equal_result = None
Expand Down
28 changes: 26 additions & 2 deletions lib/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import lib
from lib.utils import safe_makedirs
from lib.test_suite import TestSuite

from lib.test import get_result
from lib.colorer import color_stdout, color_log
from lib.tarantool_server import TarantoolServer

Expand Down Expand Up @@ -157,6 +157,19 @@ def __init__(self, worker_id, worker_name):
super(WorkerDone, self).__init__(worker_id, worker_name)


class WorkerCurrentTask(BaseWorkerMessage):
""" Provide information about current task running on worker.
It possible to check the `.result` file of hung tests.
And collect information about current tasks in parallel mode,
to show which parallel tests can affect failed test.
"""
def __init__(self, worker_id, worker_name,
task_name, task_param, task_result_filepath):
super(WorkerCurrentTask, self).__init__(worker_id, worker_name)
self.task_name = task_name
self.task_param = task_param
self.task_result_filepath = task_result_filepath

# Worker
########

Expand All @@ -176,6 +189,13 @@ def wrap_output(self, output, log_only):
def done_marker(self):
return WorkerDone(self.id, self.name)

def current_task(self, task_id):
task_name, task_param = task_id
task_result_filepath = os.path.join(self.suite.ini['vardir'],
get_result(task_name))
return WorkerCurrentTask(self.id, self.name,
task_name, task_param, task_result_filepath)

def wrap_result(self, task_id, short_status):
return WorkerTaskResult(self.id, self.name, task_id, short_status)

Expand Down Expand Up @@ -285,6 +305,8 @@ def run_loop(self, task_queue, result_queue):
schema='test_var')
self.stop_worker(task_queue, result_queue)
break

result_queue.put(self.current_task(task_id))
short_status = self.run_task(task_id)
result_queue.put(self.wrap_result(task_id, short_status))
if not lib.Options().args.is_force and short_status == 'fail':
Expand All @@ -307,7 +329,9 @@ def run_all(self, task_queue, result_queue):

try:
self.run_loop(task_queue, result_queue)
except (KeyboardInterrupt, Exception):
except (KeyboardInterrupt, Exception) as e:
if not isinstance(e, KeyboardInterrupt):
color_stdout('Exception: %s\n' % e, schema='error')
self.stop_worker(task_queue, result_queue, cleanup=False)

result_queue.put(self.done_marker())
Expand Down
33 changes: 29 additions & 4 deletions listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import lib
from lib.worker import get_reproduce_file
from lib.worker import WorkerOutput, WorkerDone, WorkerTaskResult
from lib.worker import WorkerOutput, WorkerDone, WorkerTaskResult, WorkerCurrentTask
from lib.colorer import color_stdout


Expand Down Expand Up @@ -178,28 +178,53 @@ def __init__(self, get_not_done_worker_ids, kill_all_workers, warn_timeout,
self.kill_timeout = kill_timeout
self.warned_seconds_ago = 0.0
self.inactivity = 0.0
self.worker_current_task = dict()

def process_result(self, obj):
self.warned_seconds_ago = 0.0
self.inactivity = 0.0

if isinstance(obj, WorkerCurrentTask):
self.worker_current_task[obj.worker_id] = obj

def process_timeout(self, delta_seconds):
self.warned_seconds_ago += delta_seconds
self.inactivity += delta_seconds
worker_ids = self.get_not_done_worker_ids()

if self.warned_seconds_ago < self.warn_timeout:
return

color_stdout("No output during %d seconds. "
"List of workers not reporting the status: %s; "
"Will abort after %d seconds without output.\n" % (
self.inactivity, worker_ids, self.kill_timeout),
"Will abort after %d seconds without output. "
"List of workers not reporting the status:\n" % (
self.inactivity, self.kill_timeout),
schema='test_var')

hung_tasks = [task for worker_id, task
in self.worker_current_task.iteritems()
if worker_id in worker_ids]
for current_task in hung_tasks:
color_stdout("[{0:03d}] {1} {2}\n".format(current_task.worker_id,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would be look more habitual if we'll use the same encoding (one-line yaml) as in reproduce files, like [app-tap/tap.test.lua, null].

current_task.task_name,
current_task.task_param),
schema='test_var')
color_stdout("Last 15 lines of result file "
"[{0}]\n".format(current_task.task_result_filepath),
schema='error')
lib.utils.print_tail_n(current_task.task_result_filepath,
num_lines=15)


self.warned_seconds_ago = 0.0

if self.inactivity < self.kill_timeout:
return

color_stdout('\n[Main process] No output from workers. '
'It seems that we hang. Send SIGKILL to workers; '
'exiting...\n',
schema='test_var')
self.kill_all_workers()

raise HangError()