Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

List tasks information for hung workers #110

Merged
merged 2 commits into from
Sep 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 53 additions & 27 deletions lib/test.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
import difflib
import filecmp
import gevent
import os
import pprint
import pytap13
import re
import shutil
import sys
import time
import filecmp
import difflib
import traceback
import gevent
import pytap13
import pprint
import shutil
from functools import partial

try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO

import lib
from lib.utils import non_empty_valgrind_logs, print_tail_n
from lib.colorer import color_stdout
from lib.utils import non_empty_valgrind_logs
from lib.utils import print_tail_n


class TestExecutionError(OSError):
Expand All @@ -36,7 +38,10 @@ def _run(self, *args, **kwargs):
self.callable(*self.callable_args, **self.callable_kwargs)

def __repr__(self):
return "<TestRunGreenlet at %s info='%s'>" % (hex(id(self)), getattr(self, "info", None))
return "<TestRunGreenlet at {0} info='{1}'>".format(
hex(id(self)),
getattr(self, "info", None)
)


class FilteredStream:
Expand Down Expand Up @@ -84,28 +89,35 @@ def flush(self):
self.stream.flush()


def get_filename_by_test(postfix, test_name):
rg = re.compile('\.test.*')
return os.path.basename(rg.sub(postfix, test_name))


get_reject = partial(get_filename_by_test, '.reject')
get_result = partial(get_filename_by_test, '.result')
get_skipcond = partial(get_filename_by_test, '.skipcond')


class Test:
"""An individual test file. A test object can run itself
and remembers completion state of the run.

If file <test_name>.skipcond is exists it will be executed before
test and if it sets self.skip to True value the test will be skipped.
"""
rg = re.compile('\.test.*')

def __init__(self, name, args, suite_ini, params={}, conf_name=None):
"""Initialize test properties: path to test file, path to
temporary result file, path to the client program, test status."""
self.name = name
self.args = args
self.suite_ini = suite_ini
self.result = os.path.join(suite_ini['suite'],
os.path.basename(self.rg.sub('.result', name)))
self.skip_cond = os.path.join(suite_ini['suite'],
os.path.basename(self.rg.sub('.skipcond', name)))
self.result = os.path.join(suite_ini['suite'], get_result(name))
self.skip_cond = os.path.join(suite_ini['suite'], get_skipcond(name))
self.tmp_result = os.path.join(self.suite_ini['vardir'],
os.path.basename(self.result))
self.reject = self.rg.sub('.reject', name)
self.reject = get_reject(name)
self.is_executed = False
self.is_executed_ok = None
self.is_equal_result = None
Expand All @@ -126,7 +138,9 @@ def id(self):

def passed(self):
"""Return true if this test was run successfully."""
return self.is_executed and self.is_executed_ok and self.is_equal_result
return (self.is_executed and
self.is_executed_ok and
self.is_equal_result)

def execute(self, server):
# Note: don't forget to set 'server.current_test = self' in
Expand Down Expand Up @@ -172,8 +186,9 @@ def run(self, server):
if e.__class__.__name__ == 'TarantoolStartError':
# worker should stop
raise
color_stdout('\nTest.run() received the following error:\n' +
traceback.format_exc() + '\n', schema='error')
color_stdout('\nTest.run() received the following error:\n'
'{0}\n'.format(traceback.format_exc()),
schema='error')
diagnostics = str(e)
finally:
if sys.stdout and sys.stdout != save_stdout:
Expand All @@ -185,7 +200,8 @@ def run(self, server):
is_tap = False
if not self.skip:
if self.is_executed_ok and os.path.isfile(self.result):
self.is_equal_result = filecmp.cmp(self.result, self.tmp_result)
self.is_equal_result = filecmp.cmp(self.result,
self.tmp_result)
elif self.is_executed_ok:
if lib.Options().args.is_verbose:
color_stdout('\n')
Expand All @@ -208,12 +224,15 @@ def run(self, server):
color_stdout("[ skip ]\n", schema='test_skip')
if os.path.exists(self.tmp_result):
os.remove(self.tmp_result)
elif self.is_executed_ok and self.is_equal_result and self.is_valgrind_clean:
elif (self.is_executed_ok and
self.is_equal_result and
self.is_valgrind_clean):
short_status = 'pass'
color_stdout("[ pass ]\n", schema='test_pass')
if os.path.exists(self.tmp_result):
os.remove(self.tmp_result)
elif (self.is_executed_ok and not self.is_equal_result and not
elif (self.is_executed_ok and not
self.is_equal_result and not
os.path.isfile(self.result)) and not is_tap:
shutil.copy(self.tmp_result, self.result)
short_status = 'new'
Expand All @@ -226,9 +245,11 @@ def run(self, server):
where = ""
if not self.is_crash_reported and not self.is_executed_ok:
self.print_diagnostics(self.reject,
"Test failed! Output from reject file {}:\n".format(self.reject))
"Test failed! Output from reject file "
"{0}:\n".format(self.reject))
server.print_log(15)
where = ": test execution aborted, reason '{0}'".format(diagnostics)
where = ": test execution aborted, reason " \
"'{0}'".format(diagnostics)
elif not self.is_crash_reported and not self.is_equal_result:
self.print_unidiff()
server.print_log(15)
Expand All @@ -237,7 +258,8 @@ def run(self, server):
os.remove(self.reject)
for log_file in non_empty_logs:
self.print_diagnostics(log_file,
"Test failed! Output from log file {}:\n".format(log_file))
"Test failed! Output from log file "
"{0}:\n".format(log_file))
where = ": there were warnings in the valgrind log file(s)"
return short_status

Expand All @@ -253,7 +275,8 @@ def print_unidiff(self):
to establish the cause of a failure when .test differs
from .result."""

color_stdout("\nTest failed! Result content mismatch:\n", schema='error')
color_stdout("\nTest failed! Result content mismatch:\n",
schema='error')
with open(self.result, "r") as result:
with open(self.reject, "r") as reject:
result_time = time.ctime(os.stat(self.result).st_mtime)
Expand Down Expand Up @@ -298,7 +321,8 @@ def tap_parse_print_yaml(self, yml):
def check_tap_output(self):
""" Returns is_tap, is_ok """
if not os.path.isfile(self.tmp_result):
color_stdout('\nCannot find %s\n' % self.tmp_result, schema='error')
color_stdout('\nCannot find %s\n' % self.tmp_result,
schema='error')
self.is_crash_reported = True
return False
with open(self.tmp_result, 'r') as f:
Expand All @@ -307,7 +331,8 @@ def check_tap_output(self):
try:
tap.parse(content)
except ValueError as e:
color_stdout('\nTAP13 parse failed: %s\n' % str(e), schema='error')
color_stdout('\nTAP13 parse failed: %s\n' % str(e),
schema='error')
self.is_crash_reported = True
return False, False
is_ok = True
Expand All @@ -326,6 +351,7 @@ def check_tap_output(self):
self.tap_parse_print_yaml(test_case.yaml)
is_ok = False
if not is_ok:
color_stdout('Rejected result file: %s\n' % self.reject, schema='test_var')
color_stdout('Rejected result file: %s\n' % self.reject,
schema='test_var')
self.is_crash_reported = True
return True, is_ok
44 changes: 35 additions & 9 deletions lib/worker.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import collections
import copy
import functools
import os
import signal
import traceback
import yaml
import copy
import functools
import collections

import lib
from lib.utils import safe_makedirs
from lib.test_suite import TestSuite

from lib.colorer import color_stdout, color_log
from lib.colorer import color_log
from lib.colorer import color_stdout
from lib.tarantool_server import TarantoolServer
from lib.test import get_result
from lib.test_suite import TestSuite
from lib.utils import safe_makedirs

# Utils
#######
Expand Down Expand Up @@ -157,6 +158,19 @@ def __init__(self, worker_id, worker_name):
super(WorkerDone, self).__init__(worker_id, worker_name)


class WorkerCurrentTask(BaseWorkerMessage):
""" Provide information about current task running on worker.
It possible to check the `.result` file of hung tests.
And collect information about current tasks in parallel mode,
to show which parallel tests can affect failed test.
"""
def __init__(self, worker_id, worker_name,
task_name, task_param, task_result_filepath):
super(WorkerCurrentTask, self).__init__(worker_id, worker_name)
self.task_name = task_name
self.task_param = task_param
self.task_result_filepath = task_result_filepath

# Worker
########

Expand All @@ -176,6 +190,13 @@ def wrap_output(self, output, log_only):
def done_marker(self):
return WorkerDone(self.id, self.name)

def current_task(self, task_id):
task_name, task_param = task_id
task_result_filepath = os.path.join(self.suite.ini['vardir'],
get_result(task_name))
return WorkerCurrentTask(self.id, self.name,
task_name, task_param, task_result_filepath)

def wrap_result(self, task_id, short_status):
return WorkerTaskResult(self.id, self.name, task_id, short_status)

Expand Down Expand Up @@ -267,7 +288,7 @@ def run_task(self, task_id):
except KeyboardInterrupt:
self.report_keyboard_interrupt()
raise
except Exception as e:
except Exception:
color_stdout(
'\nWorker "%s" received the following error; stopping...\n'
% self.name + traceback.format_exc() + '\n', schema='error')
Expand All @@ -285,6 +306,8 @@ def run_loop(self, task_queue, result_queue):
schema='test_var')
self.stop_worker(task_queue, result_queue)
break

result_queue.put(self.current_task(task_id))
short_status = self.run_task(task_id)
result_queue.put(self.wrap_result(task_id, short_status))
if not lib.Options().args.is_force and short_status == 'fail':
Expand All @@ -307,7 +330,10 @@ def run_all(self, task_queue, result_queue):

try:
self.run_loop(task_queue, result_queue)
except (KeyboardInterrupt, Exception):
except (KeyboardInterrupt, Exception) as e:
if not isinstance(e, KeyboardInterrupt) and \
not isinstance(e, VoluntaryStopException):
color_stdout('Exception: %s\n' % e, schema='error')
self.stop_worker(task_queue, result_queue, cleanup=False)

result_queue.put(self.done_marker())
Expand Down
45 changes: 36 additions & 9 deletions listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import yaml

import lib
from lib.worker import get_reproduce_file
from lib.worker import WorkerOutput, WorkerDone, WorkerTaskResult
from lib.colorer import color_stdout
from lib.worker import WorkerCurrentTask
from lib.worker import WorkerDone
from lib.worker import WorkerOutput
from lib.worker import WorkerTaskResult
from lib.worker import get_reproduce_file


class BaseWatcher(object):
Expand Down Expand Up @@ -170,36 +173,60 @@ class HangError(Exception):

class HangWatcher(BaseWatcher):
"""Terminate all workers if no output received 'no_output_times' time."""
def __init__(self, get_not_done_worker_ids, kill_all_workers, warn_timeout,
kill_timeout):
def __init__(self, get_not_done_worker_ids, kill_all_workers,
warn_timeout, kill_timeout):
self.get_not_done_worker_ids = get_not_done_worker_ids
self.kill_all_workers = kill_all_workers
self.warn_timeout = warn_timeout
self.kill_timeout = kill_timeout
self.warned_seconds_ago = 0.0
self.inactivity = 0.0
self.worker_current_task = dict()

def process_result(self, obj):
self.warned_seconds_ago = 0.0
self.inactivity = 0.0

if isinstance(obj, WorkerCurrentTask):
self.worker_current_task[obj.worker_id] = obj

def process_timeout(self, delta_seconds):
self.warned_seconds_ago += delta_seconds
self.inactivity += delta_seconds
worker_ids = self.get_not_done_worker_ids()

if self.warned_seconds_ago < self.warn_timeout:
return
color_stdout("No output during %d seconds. "
"List of workers not reporting the status: %s; "
"Will abort after %d seconds without output.\n" % (
self.inactivity, worker_ids, self.kill_timeout),
schema='test_var')

color_stdout(
"No output during {0.inactivity:.0f} seconds. "
"Will abort after {0.kill_timeout:.0f} seconds without output. "
"List of workers not reporting the status:\n".format(self),
schema='test_var')

hung_tasks = [task for worker_id, task
in self.worker_current_task.iteritems()
if worker_id in worker_ids]
for current_task in hung_tasks:
color_stdout("- [{0:03d}, {1}, {2}]\n".format(current_task.worker_id,
current_task.task_name,
current_task.task_param),
schema='test_var')
color_stdout("Last 15 lines of result file "
"[{0}]\n".format(current_task.task_result_filepath),
schema='error')
lib.utils.print_tail_n(current_task.task_result_filepath,
num_lines=15)

self.warned_seconds_ago = 0.0

if self.inactivity < self.kill_timeout:
return

color_stdout('\n[Main process] No output from workers. '
'It seems that we hang. Send SIGKILL to workers; '
'exiting...\n',
schema='test_var')
self.kill_all_workers()

raise HangError()