Skip to content

Commit

Permalink
vdk-jobs-troubleshooting: Add thread-dump utility
Browse files Browse the repository at this point in the history
This change adds the implementation of a thread-dump utility to the
vdk-jobs-troubleshooting plugin.

The utility uses an http server, through which an administrator is able
to force a stacktrace dump of all threads used by the python process of
the data job. The server is bound to a port on the localhost, so to get
the stacktrace, one needs to be attached to the data job pod.

Testing Done: Added unit tests for the utility registry, and tested the
plugin itself locally by running a simple data job and examining the
execution logs.

Signed-off-by: Andon Andonov <[email protected]>
  • Loading branch information
doks5 committed Dec 16, 2022
1 parent bd74935 commit 3d7afdc
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,59 @@
"""
import logging
from typing import List
from typing import Optional

from vdk.api.plugin.hook_markers import hookimpl
from vdk.api.plugin.plugin_registry import IPluginRegistry
from vdk.internal.builtin_plugins.run.job_context import JobContext
from vdk.internal.core.config import ConfigurationBuilder
from vdk.plugin.jobs_troubleshoot.api.troubleshoot_utility import ITroubleshootUtility
from vdk.plugin.jobs_troubleshoot.troubleshoot_configuration import add_definitions
from vdk.plugin.jobs_troubleshoot.troubleshoot_utilities.utilities_registry import (
get_utilities_to_use,
)

log = logging.getLogger(__name__)


class JobTroubleshootingPlugin:
def __init__(self):
self.troubleshooting_utils: Optional[List[ITroubleshootUtility]] = []

@staticmethod
@hookimpl
def vdk_configure(config_builder: ConfigurationBuilder) -> None:
add_definitions(config_builder=config_builder)

@hookimpl
def initialize_job(self, context: JobContext) -> None:
self.troubleshooting_utils = get_utilities_to_use(
job_config=context.core_context.configuration
)
try:
for util in self.troubleshooting_utils:
util.start()
except Exception as e:
log.info(
f"""
An exception occurred while processing a troubleshooting
utility. The error was: {e}
"""
)

@hookimpl
def finalize_job(self, context: JobContext) -> None:
try:
for util in self.troubleshooting_utils:
util.stop()
except Exception as e:
log.info(
f"""
An exception occurred while processing a troubleshooting
utility. The error was: {e}
"""
)


@hookimpl
def vdk_start(plugin_registry: IPluginRegistry, command_line_args: List) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from http.server import HTTPServer
from http.server import SimpleHTTPRequestHandler
from threading import Thread
from typing import Any


class HealthCheckServer:
def __init__(self, port: int, handler: Any = None):
if handler:
self._server = HTTPServer(("", port), handler)
else:
self._server = HTTPServer(("", port), SimpleHTTPRequestHandler)
self._thread = Thread(target=self._server.serve_forever)

def __enter__(self):
self.start()
return self

def __exit__(self, typ, value, traceback):
self.stop()

def start(self):
self._thread.start()

def stop(self):
self._server.shutdown()
self._server.server_close()
self._thread.join()
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import json
import logging
import sys
import threading
import time
import traceback
from http.server import BaseHTTPRequestHandler
from threading import Lock

from vdk.internal.core.config import Configuration
from vdk.plugin.jobs_troubleshoot.api.troubleshoot_utility import ITroubleshootUtility
from vdk.plugin.jobs_troubleshoot.troubleshoot_configuration import (
TROUBLESHOOT_PORT_TO_USE,
)
from vdk.plugin.jobs_troubleshoot.troubleshoot_utilities.healthcheck_server import (
HealthCheckServer,
)

log = logging.getLogger(__name__)


_lock = Lock()
_time = time.time()
_timeout = 180
_thread_dump_timeout = 60


def healthcheck_refresh():
global _time
with _lock:
_time = time.time()


def _set_timeout(timeout: int):
global _timeout
_timeout = timeout


def _check_timeout() -> float:
with _lock:
elapsed = time.time() - _time
return elapsed


class ThreadDumpHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/threads":
self.send_response(200)
self.wfile.write(b"making a thread dump")
log.info(f"Dumping threads")
self._log_thread_dump()
else:
self.send_error(404)

@staticmethod
def _log_thread_dump():
try:
for t in threading.enumerate():
log.info(
f"Thread:{t.getName()} alive:{t.is_alive()} daemon:{t.isDaemon()}"
)
log.info(f"--------------------------------------------------------------")
log.info(f"Dumping threads stacks:")
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# ThreadID: %s" % threadId)
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
if line:
code.append(" %s" % (line.strip()))
log.info(f"Threads stacks:{json.dumps(code)}")
log.info(f"--------------------------------------------------------------")
except Exception as e:
log.exception(f"test_reporter_thread exception:{e}", exc_info=True)


class ThreadDumpUtility(ITroubleshootUtility):
def __init__(self, job_configuration: Configuration):
self.port_to_use = job_configuration.get_value(TROUBLESHOOT_PORT_TO_USE)
self.server = HealthCheckServer(self.port_to_use, ThreadDumpHandler)

def start(self):
self.server.start()

def stop(self):
self.server.stop()
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from vdk.internal.core.config import Configuration
from vdk.plugin.jobs_troubleshoot.api.troubleshoot_utility import ITroubleshootUtility
from vdk.plugin.jobs_troubleshoot.troubleshoot_configuration import (
TROUBLESHOOT_UTILITIES_TO_USE,
)
from vdk.plugin.jobs_troubleshoot.troubleshoot_utilities.thread_dump import (
ThreadDumpUtility,
)


log = logging.getLogger(__name__)


def utilities_registry(job_config: Configuration) -> Dict[str, Any]:
"""
The troubleshooting utilities registry is where all utility objects are to
be initialized.
TODO: Come up with a more elegant approach to register utilities.
:param job_config: The data job configuration.
:return: A dictionary with all available troubleshooting utilities.
"""
registered_utilities: Dict[str, Any] = {}
registered_utilities["thread-dump"] = ThreadDumpUtility(
job_configuration=job_config
)

return registered_utilities


def get_utilities_to_use(
job_config: Configuration,
) -> Optional[List[ITroubleshootUtility]]:
"""
Get a list of the initialized troubleshooting utilities that are specified
by the VDK_TROUBLESHOOT_UTILITIES_TO_USE configuration variable.
:param job_config: Data Job configuration
:return: A list of utility objects that are to be used.
"""
utilities: Optional[List[ITroubleshootUtility]] = []
selected_utilities: str = job_config.get_value(TROUBLESHOOT_UTILITIES_TO_USE)
registered_utilities: Dict = utilities_registry(job_config=job_config)

for util in selected_utilities.split(","):
if util in registered_utilities.keys():
utilities.append(registered_utilities.get(util))
else:
log.info(
f"""
Utility {util} is not in the list of available troubleshooting
utilities.
Available utilities: {registered_utilities.keys()}
"""
)

return utilities

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from typing import List

from vdk.internal.core.config import ConfigurationBuilder
from vdk.plugin.jobs_troubleshoot.troubleshoot_configuration import (
TROUBLESHOOT_PORT_TO_USE,
)
from vdk.plugin.jobs_troubleshoot.troubleshoot_configuration import (
TROUBLESHOOT_UTILITIES_TO_USE,
)
from vdk.plugin.jobs_troubleshoot.troubleshoot_utilities.utilities_registry import (
get_utilities_to_use,
)


def test_get_utilities_to_use():
config_builder = ConfigurationBuilder()
config_builder.set_value(key=TROUBLESHOOT_UTILITIES_TO_USE, value="thread-dump")
config_builder.set_value(key=TROUBLESHOOT_PORT_TO_USE, value=8783)
configuration = config_builder.build()

utility = get_utilities_to_use(configuration)

assert isinstance(utility, List)
assert len(utility) == 1


def test_get_utilities_to_use__nonexistent_utility():
config_builder = ConfigurationBuilder()
config_builder.set_value(
key=TROUBLESHOOT_UTILITIES_TO_USE, value="non-existent-utility"
)
config_builder.set_value(key=TROUBLESHOOT_PORT_TO_USE, value=8783)
configuration = config_builder.build()

utility = get_utilities_to_use(configuration)

assert len(utility) == 0

0 comments on commit 3d7afdc

Please sign in to comment.