diff --git a/projects/vdk-plugins/vdk-jobs-troubleshooting/src/vdk/plugin/jobs_troubleshoot/jobs_troubleshoot_plugin.py b/projects/vdk-plugins/vdk-jobs-troubleshooting/src/vdk/plugin/jobs_troubleshoot/jobs_troubleshoot_plugin.py index 4f22725246..fa60d1b376 100644 --- a/projects/vdk-plugins/vdk-jobs-troubleshooting/src/vdk/plugin/jobs_troubleshoot/jobs_troubleshoot_plugin.py +++ b/projects/vdk-plugins/vdk-jobs-troubleshooting/src/vdk/plugin/jobs_troubleshoot/jobs_troubleshoot_plugin.py @@ -5,21 +5,59 @@ """ import logging from typing import List +from typing import Optional from vdk.api.plugin.hook_markers import hookimpl from vdk.api.plugin.plugin_registry import IPluginRegistry +from vdk.internal.builtin_plugins.run.job_context import JobContext from vdk.internal.core.config import ConfigurationBuilder +from vdk.plugin.jobs_troubleshoot.api.troubleshoot_utility import ITroubleshootUtility from vdk.plugin.jobs_troubleshoot.troubleshoot_configuration import add_definitions +from vdk.plugin.jobs_troubleshoot.troubleshoot_utilities.utilities_registry import ( + get_utilities_to_use, +) log = logging.getLogger(__name__) class JobTroubleshootingPlugin: + def __init__(self): + self.troubleshooting_utils: Optional[List[ITroubleshootUtility]] = [] + @staticmethod @hookimpl def vdk_configure(config_builder: ConfigurationBuilder) -> None: add_definitions(config_builder=config_builder) + @hookimpl + def initialize_job(self, context: JobContext) -> None: + self.troubleshooting_utils = get_utilities_to_use( + job_config=context.core_context.configuration + ) + try: + for util in self.troubleshooting_utils: + util.start() + except Exception as e: + log.info( + f""" + An exception occurred while processing a troubleshooting + utility. The error was: {e} + """ + ) + + @hookimpl + def finalize_job(self, context: JobContext) -> None: + try: + for util in self.troubleshooting_utils: + util.stop() + except Exception as e: + log.info( + f""" + An exception occurred while processing a troubleshooting + utility. The error was: {e} + """ + ) + @hookimpl def vdk_start(plugin_registry: IPluginRegistry, command_line_args: List) -> None: diff --git a/projects/vdk-plugins/vdk-jobs-troubleshooting/src/vdk/plugin/jobs_troubleshoot/troubleshoot_utilities/healthcheck_server.py b/projects/vdk-plugins/vdk-jobs-troubleshooting/src/vdk/plugin/jobs_troubleshoot/troubleshoot_utilities/healthcheck_server.py new file mode 100644 index 0000000000..4a348b0708 --- /dev/null +++ b/projects/vdk-plugins/vdk-jobs-troubleshooting/src/vdk/plugin/jobs_troubleshoot/troubleshoot_utilities/healthcheck_server.py @@ -0,0 +1,30 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from http.server import HTTPServer +from http.server import SimpleHTTPRequestHandler +from threading import Thread +from typing import Any + + +class HealthCheckServer: + def __init__(self, port: int, handler: Any = None): + if handler: + self._server = HTTPServer(("", port), handler) + else: + self._server = HTTPServer(("", port), SimpleHTTPRequestHandler) + self._thread = Thread(target=self._server.serve_forever) + + def __enter__(self): + self.start() + return self + + def __exit__(self, typ, value, traceback): + self.stop() + + def start(self): + self._thread.start() + + def stop(self): + self._server.shutdown() + self._server.server_close() + self._thread.join() diff --git a/projects/vdk-plugins/vdk-jobs-troubleshooting/src/vdk/plugin/jobs_troubleshoot/troubleshoot_utilities/thread_dump.py b/projects/vdk-plugins/vdk-jobs-troubleshooting/src/vdk/plugin/jobs_troubleshoot/troubleshoot_utilities/thread_dump.py new file mode 100644 index 0000000000..ec59ab619f --- /dev/null +++ b/projects/vdk-plugins/vdk-jobs-troubleshooting/src/vdk/plugin/jobs_troubleshoot/troubleshoot_utilities/thread_dump.py @@ -0,0 +1,88 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import json +import logging +import sys +import threading +import time +import traceback +from http.server import BaseHTTPRequestHandler +from threading import Lock + +from vdk.internal.core.config import Configuration +from vdk.plugin.jobs_troubleshoot.api.troubleshoot_utility import ITroubleshootUtility +from vdk.plugin.jobs_troubleshoot.troubleshoot_configuration import ( + TROUBLESHOOT_PORT_TO_USE, +) +from vdk.plugin.jobs_troubleshoot.troubleshoot_utilities.healthcheck_server import ( + HealthCheckServer, +) + +log = logging.getLogger(__name__) + + +_lock = Lock() +_time = time.time() +_timeout = 180 +_thread_dump_timeout = 60 + + +def healthcheck_refresh(): + global _time + with _lock: + _time = time.time() + + +def _set_timeout(timeout: int): + global _timeout + _timeout = timeout + + +def _check_timeout() -> float: + with _lock: + elapsed = time.time() - _time + return elapsed + + +class ThreadDumpHandler(BaseHTTPRequestHandler): + def do_GET(self): + if self.path == "/threads": + self.send_response(200) + self.wfile.write(b"making a thread dump") + log.info(f"Dumping threads") + self._log_thread_dump() + else: + self.send_error(404) + + @staticmethod + def _log_thread_dump(): + try: + for t in threading.enumerate(): + log.info( + f"Thread:{t.getName()} alive:{t.is_alive()} daemon:{t.isDaemon()}" + ) + log.info(f"--------------------------------------------------------------") + log.info(f"Dumping threads stacks:") + code = [] + for threadId, stack in sys._current_frames().items(): + code.append("\n# ThreadID: %s" % threadId) + for filename, lineno, name, line in traceback.extract_stack(stack): + code.append('File: "%s", line %d, in %s' % (filename, lineno, name)) + if line: + code.append(" %s" % (line.strip())) + log.info(f"Threads stacks:{json.dumps(code)}") + log.info(f"--------------------------------------------------------------") + except Exception as e: + log.exception(f"test_reporter_thread exception:{e}", exc_info=True) + + +class ThreadDumpUtility(ITroubleshootUtility): + def __init__(self, job_configuration: Configuration): + self.port_to_use = job_configuration.get_value(TROUBLESHOOT_PORT_TO_USE) + self.server = HealthCheckServer(self.port_to_use, ThreadDumpHandler) + + def start(self): + self.server.start() + + def stop(self): + self.server.stop() diff --git a/projects/vdk-plugins/vdk-jobs-troubleshooting/src/vdk/plugin/jobs_troubleshoot/troubleshoot_utilities/utilities_registry.py b/projects/vdk-plugins/vdk-jobs-troubleshooting/src/vdk/plugin/jobs_troubleshoot/troubleshoot_utilities/utilities_registry.py new file mode 100644 index 0000000000..0dc19c02a7 --- /dev/null +++ b/projects/vdk-plugins/vdk-jobs-troubleshooting/src/vdk/plugin/jobs_troubleshoot/troubleshoot_utilities/utilities_registry.py @@ -0,0 +1,64 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +import logging +from typing import Any +from typing import Dict +from typing import List +from typing import Optional + +from vdk.internal.core.config import Configuration +from vdk.plugin.jobs_troubleshoot.api.troubleshoot_utility import ITroubleshootUtility +from vdk.plugin.jobs_troubleshoot.troubleshoot_configuration import ( + TROUBLESHOOT_UTILITIES_TO_USE, +) +from vdk.plugin.jobs_troubleshoot.troubleshoot_utilities.thread_dump import ( + ThreadDumpUtility, +) + + +log = logging.getLogger(__name__) + + +def utilities_registry(job_config: Configuration) -> Dict[str, Any]: + """ + The troubleshooting utilities registry is where all utility objects are to + be initialized. + TODO: Come up with a more elegant approach to register utilities. + + :param job_config: The data job configuration. + :return: A dictionary with all available troubleshooting utilities. + """ + registered_utilities: Dict[str, Any] = {} + registered_utilities["thread-dump"] = ThreadDumpUtility( + job_configuration=job_config + ) + + return registered_utilities + + +def get_utilities_to_use( + job_config: Configuration, +) -> Optional[List[ITroubleshootUtility]]: + """ + Get a list of the initialized troubleshooting utilities that are specified + by the VDK_TROUBLESHOOT_UTILITIES_TO_USE configuration variable. + :param job_config: Data Job configuration + :return: A list of utility objects that are to be used. + """ + utilities: Optional[List[ITroubleshootUtility]] = [] + selected_utilities: str = job_config.get_value(TROUBLESHOOT_UTILITIES_TO_USE) + registered_utilities: Dict = utilities_registry(job_config=job_config) + + for util in selected_utilities.split(","): + if util in registered_utilities.keys(): + utilities.append(registered_utilities.get(util)) + else: + log.info( + f""" + Utility {util} is not in the list of available troubleshooting + utilities. + Available utilities: {registered_utilities.keys()} + """ + ) + + return utilities diff --git a/projects/vdk-plugins/vdk-jobs-troubleshooting/tests/test_jobs_troubleshoot.py b/projects/vdk-plugins/vdk-jobs-troubleshooting/tests/test_jobs_troubleshoot.py deleted file mode 100644 index dac242d392..0000000000 --- a/projects/vdk-plugins/vdk-jobs-troubleshooting/tests/test_jobs_troubleshoot.py +++ /dev/null @@ -1,6 +0,0 @@ -# Copyright 2021 VMware, Inc. -# SPDX-License-Identifier: Apache-2.0 - - -def test_job_troubleshoot(): - pass diff --git a/projects/vdk-plugins/vdk-jobs-troubleshooting/tests/test_utilities_registry.py b/projects/vdk-plugins/vdk-jobs-troubleshooting/tests/test_utilities_registry.py new file mode 100644 index 0000000000..dfab41de01 --- /dev/null +++ b/projects/vdk-plugins/vdk-jobs-troubleshooting/tests/test_utilities_registry.py @@ -0,0 +1,39 @@ +# Copyright 2021 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +from typing import List + +from vdk.internal.core.config import ConfigurationBuilder +from vdk.plugin.jobs_troubleshoot.troubleshoot_configuration import ( + TROUBLESHOOT_PORT_TO_USE, +) +from vdk.plugin.jobs_troubleshoot.troubleshoot_configuration import ( + TROUBLESHOOT_UTILITIES_TO_USE, +) +from vdk.plugin.jobs_troubleshoot.troubleshoot_utilities.utilities_registry import ( + get_utilities_to_use, +) + + +def test_get_utilities_to_use(): + config_builder = ConfigurationBuilder() + config_builder.set_value(key=TROUBLESHOOT_UTILITIES_TO_USE, value="thread-dump") + config_builder.set_value(key=TROUBLESHOOT_PORT_TO_USE, value=8783) + configuration = config_builder.build() + + utility = get_utilities_to_use(configuration) + + assert isinstance(utility, List) + assert len(utility) == 1 + + +def test_get_utilities_to_use__nonexistent_utility(): + config_builder = ConfigurationBuilder() + config_builder.set_value( + key=TROUBLESHOOT_UTILITIES_TO_USE, value="non-existent-utility" + ) + config_builder.set_value(key=TROUBLESHOOT_PORT_TO_USE, value=8783) + configuration = config_builder.build() + + utility = get_utilities_to_use(configuration) + + assert len(utility) == 0