Skip to content

Commit

Permalink
Improve the logging tests and add support for resetting the logger (#…
Browse files Browse the repository at this point in the history
…1716)

- Allows the logger to be "reset" which is necessary to avoid duplicate logging handlers if `configure_logging` is called multiple times
- Updates all of the logging tests to parameterize the log level and better check if the handlers are actually being called.


## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - Michael Demoret (https://github.com/mdemoret-nv)
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - David Gardner (https://github.com/dagardner-nv)

URL: #1716
  • Loading branch information
mdemoret-nv authored Jun 4, 2024
1 parent d9e6474 commit 21c1694
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 120 deletions.
100 changes: 67 additions & 33 deletions morpheus/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import os
import re
import warnings
import weakref
from enum import Enum

import appdirs
Expand Down Expand Up @@ -116,49 +117,82 @@ def _configure_from_log_level(*extra_handlers: logging.Handler, log_level: int):
# Get the root Morpheus logger
morpheus_logger = logging.getLogger("morpheus")

# Set the level here
set_log_level(log_level=log_level)
# Prevent reconfiguration if called again
if (not getattr(morpheus_logger, "_configured_by_morpheus", False)):
setattr(morpheus_logger, "_configured_by_morpheus", True)

# Dont propagate upstream
morpheus_logger.propagate = False
morpheus_logging_queue = multiprocessing.Queue()
# Set the level here
set_log_level(log_level=log_level)

# This needs the be the only handler for morpheus logger
morpheus_queue_handler = logging.handlers.QueueHandler(morpheus_logging_queue)
# Dont propagate upstream
morpheus_logger.propagate = False
morpheus_logging_queue = multiprocessing.Queue()

# At this point, any morpheus logger will propagate upstream to the morpheus root and then be handled by the queue
# handler
morpheus_logger.addHandler(morpheus_queue_handler)
# This needs the be the only handler for morpheus logger
morpheus_queue_handler = logging.handlers.QueueHandler(morpheus_logging_queue)

log_file = os.path.join(appdirs.user_log_dir(appauthor="NVIDIA", appname="morpheus"), "morpheus.log")
# At this point, any morpheus logger will propagate upstream to the morpheus root and then be handled by the
# queue handler
morpheus_logger.addHandler(morpheus_queue_handler)

# Ensure the log directory exists
os.makedirs(os.path.dirname(log_file), exist_ok=True)
log_file = os.path.join(appdirs.user_log_dir(appauthor="NVIDIA", appname="morpheus"), "morpheus.log")

# Now we build all of the handlers for the queue listener
file_handler = logging.handlers.RotatingFileHandler(filename=log_file, backupCount=5, maxBytes=1000000)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(
logging.Formatter('%(asctime)s - [%(levelname)s]: %(message)s {%(name)s, %(threadName)s}'))
# Ensure the log directory exists
os.makedirs(os.path.dirname(log_file), exist_ok=True)

# Tqdm stream handler (avoids messing with progress bars)
console_handler = TqdmLoggingHandler()
# Now we build all of the handlers for the queue listener
file_handler = logging.handlers.RotatingFileHandler(filename=log_file, backupCount=5, maxBytes=1000000)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(
logging.Formatter('%(asctime)s - [%(levelname)s]: %(message)s {%(name)s, %(threadName)s}'))

# Build and run the queue listener to actually process queued messages
queue_listener = logging.handlers.QueueListener(morpheus_logging_queue,
console_handler,
file_handler,
*extra_handlers,
respect_handler_level=True)
queue_listener.start()
queue_listener._thread.name = "Logging Thread"
# Tqdm stream handler (avoids messing with progress bars)
console_handler = TqdmLoggingHandler()

# Register a function to kill the listener thread before shutting down. prevents error on intpreter close
def stop_queue_listener():
queue_listener.stop()
# Build and run the queue listener to actually process queued messages
queue_listener = logging.handlers.QueueListener(morpheus_logging_queue,
console_handler,
file_handler,
*extra_handlers,
respect_handler_level=True)
queue_listener.start()
queue_listener._thread.name = "Logging Thread"

import atexit
atexit.register(stop_queue_listener)
# Register a function to kill the listener thread when the queue_handler is removed.
weakref.finalize(morpheus_queue_handler, queue_listener.stop)

# Register a handler before shutting down to remove all log handlers, this ensures that the weakref.finalize
# handler we just defined is called at exit.
import atexit
atexit.register(reset_logging)
else:
raise RuntimeError("Logging has already been configured. Use `set_log_level` to change the log level or reset "
"the logging system by calling `reset_logging`.")


def reset_logging(logger_name: str = "morpheus"):
"""
Resets the Morpheus logging system. This will remove all handlers from the Morpheus logger and stop the queue
listener. This is useful for testing where the logging system needs to be reconfigured multiple times or
reconfigured with different settings.
"""

morpheus_logger = logging.getLogger(logger_name)

for handler in morpheus_logger.handlers.copy():
# Copied from `logging.shutdown`.
try:
handler.acquire()
handler.flush()
handler.close()
except (OSError, ValueError):
pass
finally:
handler.release()
morpheus_logger.removeHandler(handler)

if hasattr(morpheus_logger, "_configured_by_morpheus"):
delattr(morpheus_logger, "_configured_by_morpheus")


def configure_logging(*extra_handlers: logging.Handler, log_level: int = None, log_config_file: str = None):
Expand Down
5 changes: 5 additions & 0 deletions tests/benchmarks/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
from test_bench_e2e_pipelines import E2E_TEST_CONFIGS


@pytest.fixture(autouse=True)
def reset_logging_fixture(reset_logging): # pylint: disable=unused-argument
yield


# pylint: disable=unused-argument
def pytest_benchmark_update_json(config, benchmarks, output_json):

Expand Down
14 changes: 8 additions & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,18 @@ def should_filter_test(item: pytest.Item):
items[:] = [x for x in items if should_filter_test(x)]


def clear_handlers(logger):
handlers = logger.handlers.copy()
for handler in handlers:
logger.removeHandler(handler)
@pytest.fixture(scope="function", name="reset_logging")
def reset_logging_fixture():
from morpheus.utils.logger import reset_logging
reset_logging()
yield


@pytest.hookimpl(trylast=True)
def pytest_runtest_teardown(item, nextitem):
clear_handlers(logging.getLogger("morpheus"))
clear_handlers(logging.getLogger())
from morpheus.utils.logger import reset_logging
reset_logging(logger_name="morpheus")
reset_logging(logger_name=None) # Reset the root logger as well


# This fixture will be used by all tests.
Expand Down
27 changes: 9 additions & 18 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,28 +144,19 @@ def config_warning_fixture():
@pytest.mark.use_python
class TestCLI:

def test_help(self):
@pytest.mark.parametrize('cmd',
[[], ['tools'], ['run'], ['run', 'pipeline-ae'], ['run', 'pipeline-fil'],
['run', 'pipeline-nlp'], ['run', 'pipeline-other']])
def test_help(self, cmd: list[str]):
runner = CliRunner()
result = runner.invoke(commands.cli, ['--help'])
result = runner.invoke(commands.cli, cmd + ['--help'])
assert result.exit_code == 0, result.output

result = runner.invoke(commands.cli, ['tools', '--help'])
assert result.exit_code == 0, result.output

result = runner.invoke(commands.cli, ['run', '--help'])
assert result.exit_code == 0, result.output

result = runner.invoke(commands.cli, ['run', 'pipeline-ae', '--help'])
assert result.exit_code == 0, result.output

def test_autocomplete(self, tmp_path):
@pytest.mark.parametrize('cmd',
[['tools', 'autocomplete', 'show'], ['tools', 'autocomplete', 'install', '--shell=bash']])
def test_autocomplete(self, tmp_path, cmd: list[str]):
runner = CliRunner()
result = runner.invoke(commands.cli, ['tools', 'autocomplete', 'show'], env={'HOME': str(tmp_path)})
assert result.exit_code == 0, result.output

# The actual results of this are specific to the implementation of click_completion
result = runner.invoke(commands.cli, ['tools', 'autocomplete', 'install', '--shell=bash'],
env={'HOME': str(tmp_path)})
result = runner.invoke(commands.cli, cmd, env={'HOME': str(tmp_path)})
assert result.exit_code == 0, result.output

@pytest.mark.usefixtures("restore_environ")
Expand Down
9 changes: 2 additions & 7 deletions tests/test_dfp_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import os
import typing
from io import StringIO
Expand Down Expand Up @@ -44,20 +43,16 @@
from morpheus.stages.preprocess import train_ae_stage
from morpheus.utils.compare_df import compare_df
from morpheus.utils.file_utils import load_labels_file
from morpheus.utils.logger import configure_logging

if (typing.TYPE_CHECKING):
from kafka import KafkaConsumer

configure_logging(log_level=logging.DEBUG)
# End-to-end test intended to imitate the dfp validation test


@pytest.mark.kafka
@pytest.mark.slow
@pytest.mark.use_python
@pytest.mark.reload_modules([commands, preprocess_ae_stage, train_ae_stage])
@pytest.mark.usefixtures("reload_modules")
@pytest.mark.usefixtures("reload_modules", "loglevel_debug")
@mock.patch('morpheus.stages.preprocess.train_ae_stage.AutoEncoder')
def test_dfp_roleg(mock_ae: mock.MagicMock,
dataset_pandas: DatasetManager,
Expand Down Expand Up @@ -161,7 +156,7 @@ def test_dfp_roleg(mock_ae: mock.MagicMock,
@pytest.mark.slow
@pytest.mark.use_python
@pytest.mark.reload_modules([preprocess_ae_stage, train_ae_stage])
@pytest.mark.usefixtures("reload_modules")
@pytest.mark.usefixtures("reload_modules", "loglevel_debug")
@mock.patch('morpheus.stages.preprocess.train_ae_stage.AutoEncoder')
def test_dfp_user123(mock_ae: mock.MagicMock,
dataset_pandas: DatasetManager,
Expand Down
Loading

0 comments on commit 21c1694

Please sign in to comment.