From ce31b2683e38a0748d042c5bd18e57bfdd252c18 Mon Sep 17 00:00:00 2001 From: Rahul Huilgol Date: Fri, 11 Oct 2019 16:37:51 -0700 Subject: [PATCH] Write support for tensorboard (#252) * Save histograms for weights and gradients * Use standard TF summary function * undo line break changes * fix cases when bool tensor was being passed to add_histogram, and fix tests * Fix region bug and update tb_writer construction * Include summaries if any write_histogram was set to True * Refactor writers in core * set default step to 0 * Use new writer in hook * Cherry picking change of refactor writers * set default step to 0 * remove histogram related stuff * rename IndexUtil * Fix imports * remove import of re * Fix import of summary proto * Fix step usage in writers * Fix step usage by event file writer * Remove direcotry in tensorboard directory, and add collection name as prefix for summaries created * Fix import errors * Fix resnet example which did not have str2bool args * Fix core test * Fix core test * Indentation and move some code to a new function * Merged Vikas' branch on tb data read * Add untested support to read tensorboard data * Write mode and mode_step for summaries, and fix the error of multiple global steps being assigned to same train step * remove unnecessary file * remove test script * Remove changes to imagenet script * working scalars * Change path of tornasole event files * Have new index file per mode for tensorboard events * Move tensor values to different file * move to outside tensors folder * Change frequencies for tf examples * Introduce CollectionKeys * Merging export as json * Make histogram a reduction config property, and add save_raw_tensor field to reduction config. Verified the usage for tensorflow. Also some cleanup with respect to save config in save manager * Fix bug in loading collections * Fix writing tensorboard data in global mode * Add graph support to pytorch models. Copied some new protos, and a couple of files from torch.tensorboard. * Working graph export for mxnet * Save graph correctly for mxnet * undo utils change worker pid * fix import * fix import * do not flush index writer * remove data files * Fix save config issue * make save_histogram a property of collection * Fix save config bugs, and add scalar support to TF * Skip summaries whose tensors are unreachable in graph, and avoid adding histogram when original collection is not included * Move histogram creation to writer instead of event_file_writer, refactor should_save_collection in save manager, add save_scalar methods to MXNet and Pytorch * WIP tensor scalar support * undo add of data * remove test * use correct writer * Make saving scalars work, and added type checks * Writing scalars and tensors supported. tested in tensorboard. need to test through trials * WIP testing steps * remove save scalar and tensor for now because of step number issues. work on trial loading tensorboard data and come back to this * Working reads in non index mode * Tensorboard reads working with indexing * cleanup index file location function * Make pytorch tests working * Reduce length of test_estimator_modes, and add tf tensorboard test * Add basic scalar summary test * Untested completed reads of tensorboard data * Add more tensorboard tests for trial * fix test when reading event files for tensorboard from s3 * Fixed a reduction test * Fix reduction test in TF * Fix merge of a test * fix logger import, and default save/reduction config in save manager * Fix reduction save_raw_tensor in TF * Some cleanup of prepare and collection includes * fix tf tests * Fix all tests * Add tensorboard index test * Fix tensorboard test wrt optimizer_variables * not save histogram for strings * remove when nan support * add hash * Fix collection checks in xgboost * add xgboost tests * Typo * Update hook.py (#243) * reduce length of test and add / to prefix * WIP move to tornasole hist summaries for TF * Change collections_to_save_for_step, make TF use custom histograms, refactor to _save_tensor method for all frameworks * rename to save_for_tensor * undo some files * undo some files * Update tests.sh * remove pytorch graph support * remove mxnet graph support * cleanup * remove tf tensorboard duplicated test * Fix bug of tb writer not being closed after exporting graph * WIP fixing tests * Remove read changes * fix value types remaining in code * fix tests * catch exception when nan * use make_numpy_array for xgboost * Fix xgboost error where collections_in_set was empty but not none * change log * remove summary collections * tweak dry run behavior * Fix dry run flag * undo move of steps to own file * Delete steps.py * fix import * fix import in test * cleanup * remove index for tensorboard data * Address review comments * Update hook.py --- tests/analysis/exceptions/test_exceptions.py | 8 +- tests/core/test_index.py | 6 +- .../tensorflow/hooks/test_estimator_modes.py | 114 +++--- tests/tensorflow/hooks/test_losses.py | 6 +- tests/tensorflow/hooks/test_reductions.py | 26 +- tests/tensorflow/hooks/test_save_all_full.py | 10 +- tests/tensorflow/hooks/test_save_config.py | 2 +- .../tensorflow/hooks/test_save_reductions.py | 5 +- tests/tensorflow/hooks/test_training_end.py | 2 +- tests/tensorflow/hooks/test_write.py | 4 +- tornasole/core/collection.py | 34 +- tornasole/core/hook.py | 340 ++++++++++++------ tornasole/core/index_reader.py | 7 +- tornasole/core/locations.py | 104 ++++-- tornasole/core/reader.py | 8 +- tornasole/core/s3_utils.py | 1 - tornasole/core/tensor.py | 17 +- tornasole/core/tfevent/event_file_reader.py | 52 ++- tornasole/core/tfevent/event_file_writer.py | 63 +--- tornasole/core/tfevent/summary.py | 148 ++++++++ tornasole/core/tfevent/util.py | 13 + tornasole/core/tfrecord/record_reader.py | 13 +- tornasole/core/tfrecord/tensor_reader.py | 49 ++- tornasole/core/utils.py | 8 +- tornasole/core/writer.py | 121 +++++-- tornasole/mxnet/hook.py | 58 ++- tornasole/pytorch/hook.py | 57 ++- tornasole/tensorflow/collection.py | 10 +- tornasole/tensorflow/hook.py | 203 ++++++----- tornasole/tensorflow/keras.py | 23 +- tornasole/tensorflow/utils.py | 4 - tornasole/trials/local_trial.py | 12 +- tornasole/trials/s3_trial.py | 4 +- tornasole/trials/trial.py | 12 +- tornasole/xgboost/hook.py | 30 +- 35 files changed, 1032 insertions(+), 542 deletions(-) create mode 100644 tornasole/core/tfevent/summary.py diff --git a/tests/analysis/exceptions/test_exceptions.py b/tests/analysis/exceptions/test_exceptions.py index 2c766739ba..da41f87825 100644 --- a/tests/analysis/exceptions/test_exceptions.py +++ b/tests/analysis/exceptions/test_exceptions.py @@ -1,9 +1,11 @@ +import shutil import pytest import uuid from tests.analysis.utils import generate_data from tornasole.trials import create_trial from tornasole.exceptions import * import boto3 as boto3 +import os def del_s3(bucket, file_path): s3_client = boto3.client('s3') @@ -12,8 +14,7 @@ def del_s3(bucket, file_path): @pytest.mark.slow # 0:40 to run def test_refresh_tensors(): trial_name = str(uuid.uuid4()) - path = 's3://tornasolecodebuildtest/rules/tensors/ts_output/train/' - bucket = 'tornasolecodebuildtest' + path = '/tmp/tornasole_analysis_tests/test_refresh_tensors/' num_steps = 8 num_tensors = 10 for i in range(num_steps): @@ -29,7 +30,6 @@ def test_refresh_tensors(): assert False except TensorUnavailable: pass - del_s3(bucket, file_path=path) assert tr.tensor('foo_1') is not None assert tr.tensor('foo_1').value(num_steps - 1) is not None @@ -63,3 +63,5 @@ def test_refresh_tensors(): assert False except StepNotYetAvailable: pass + + shutil.rmtree(os.path.join(path, trial_name)) \ No newline at end of file diff --git a/tests/core/test_index.py b/tests/core/test_index.py index b663e9a151..e9f82da775 100644 --- a/tests/core/test_index.py +++ b/tests/core/test_index.py @@ -1,6 +1,6 @@ from tornasole.core.writer import FileWriter from tornasole.core.reader import FileReader -from tornasole.core.locations import EventFileLocation, IndexFileLocationUtils +from tornasole.core.locations import TensorFileLocation, IndexFileLocationUtils import shutil import os import numpy as np @@ -21,8 +21,8 @@ def test_index(): writer.write_tensor(tdata=numpy_tensor[i], tname=n) writer.flush() writer.close() - efl = EventFileLocation(step_num=step, worker_name=worker) - eventfile = efl.get_location(trial_dir=run_dir) + efl = TensorFileLocation(step_num=step, worker_name=worker) + eventfile = efl.get_file_location(trial_dir=run_dir) indexfile = IndexFileLocationUtils.get_index_key_for_step(run_dir, step, worker) fo = open(eventfile, "rb") diff --git a/tests/tensorflow/hooks/test_estimator_modes.py b/tests/tensorflow/hooks/test_estimator_modes.py index fab5d522d6..6a0a9ba7ee 100644 --- a/tests/tensorflow/hooks/test_estimator_modes.py +++ b/tests/tensorflow/hooks/test_estimator_modes.py @@ -9,7 +9,6 @@ Integration tests with S3 take 95% of the time. """ - import pytest import tensorflow.compat.v1 as tf import numpy as np @@ -23,9 +22,11 @@ from tornasole.tensorflow import reset_collections from tornasole.tensorflow.hook import TornasoleHook from tornasole.trials import create_trial +from tornasole.core.utils import is_s3 from tests.analysis.utils import delete_s3_prefix -def help_test_mnist(path, save_config=None, hook=None, set_modes=True): +def help_test_mnist(path, save_config=None, hook=None, set_modes=True, + num_train_steps=20, num_eval_steps=10): trial_dir = path tf.reset_default_graph() if hook is None: @@ -135,31 +136,44 @@ def train(num_steps): if set_modes: hook.set_mode(ts.modes.TRAIN) # train one step and display the probabilties - train(2) + train(num_train_steps/2) if set_modes: hook.set_mode(ts.modes.EVAL) mnist_classifier.evaluate(input_fn=eval_input_fn, - steps=3, + steps=num_eval_steps, hooks=[hook]) if set_modes: hook.set_mode(ts.modes.TRAIN) - train(2) + train(num_train_steps/2) return train -@pytest.mark.slow # 0:02 to run -def test_mnist_local(): - run_id = 'trial_' + datetime.now().strftime('%Y%m%d-%H%M%S%f') - trial_dir = os.path.join(TORNASOLE_TF_HOOK_TESTS_DIR, run_id) - help_test_mnist(trial_dir, ts.SaveConfig(save_interval=2)) +def helper_test_mnist_trial(trial_dir): tr = create_trial(trial_dir) - assert len(tr.available_steps()) == 4 + assert len(tr.available_steps()) == 3 assert len(tr.available_steps(mode=ts.modes.TRAIN)) == 2 - assert len(tr.available_steps(mode=ts.modes.EVAL)) == 2 + assert len(tr.available_steps(mode=ts.modes.EVAL)) == 1 assert len(tr.tensors()) == 17 - shutil.rmtree(trial_dir) + on_s3, bucket, prefix = is_s3(trial_dir) + if not on_s3: + shutil.rmtree(trial_dir, ignore_errors=True) + else: + delete_s3_prefix(bucket, prefix) + +@pytest.mark.slow # 0:02 to run +def test_mnist(on_s3=False): + run_id = 'trial_' + datetime.now().strftime('%Y%m%d-%H%M%S%f') + if on_s3: + bucket = 'tornasole-testing' + prefix = 'tornasole_tf/hooks/estimator_modes/' + run_id + trial_dir = f's3://{bucket}/{prefix}' + else: + trial_dir = os.path.join(TORNASOLE_TF_HOOK_TESTS_DIR, run_id) + help_test_mnist(trial_dir, save_config=ts.SaveConfig(save_interval=2), + num_train_steps=4, num_eval_steps=2) + helper_test_mnist_trial(trial_dir) @pytest.mark.slow # 0:02 to run def test_mnist_local_json(): @@ -167,59 +181,48 @@ def test_mnist_local_json(): shutil.rmtree(out_dir, ignore_errors=True) os.environ[TORNASOLE_CONFIG_FILE_PATH_ENV_STR] = 'tests/tensorflow/hooks/test_json_configs/test_mnist_local.json' hook = TornasoleHook.hook_from_config() - help_test_mnist(path=out_dir, hook=hook) - tr = create_trial(out_dir) - assert len(tr.available_steps()) == 4 - assert len(tr.available_steps(mode=ts.modes.TRAIN)) == 2 - assert len(tr.available_steps(mode=ts.modes.EVAL)) == 2 - assert len(tr.tensors()) == 17 - shutil.rmtree(out_dir, ignore_errors=True) + help_test_mnist(path=out_dir, hook=hook, + num_train_steps=4, num_eval_steps=2) + helper_test_mnist_trial(out_dir) @pytest.mark.slow # 1:04 to run def test_mnist_s3(): - run_id = 'trial_' + datetime.now().strftime('%Y%m%d-%H%M%S%f') - bucket = 'tornasole-testing' - prefix = 'tornasole_tf/hooks/estimator_modes/' + run_id - trial_dir = f's3://{bucket}/{prefix}' - help_test_mnist(trial_dir, ts.SaveConfig(save_interval=2)) + # Takes 1:04 to run, compared to 4 seconds above. + # Speed improvements, or should we migrate integration tests to their own folder? + test_mnist(True) + +def helper_test_multi_save_configs_trial(trial_dir): tr = create_trial(trial_dir) - assert len(tr.available_steps()) == 4 - assert len(tr.available_steps(mode=ts.modes.TRAIN)) == 2 + assert len(tr.available_steps()) == 5, tr.available_steps() + assert len(tr.available_steps(mode=ts.modes.TRAIN)) == 3 assert len(tr.available_steps(mode=ts.modes.EVAL)) == 2 assert len(tr.tensors()) == 17 - delete_s3_prefix(bucket, prefix) + on_s3, bucket, prefix = is_s3(trial_dir) + if not on_s3: + shutil.rmtree(trial_dir) + else: + delete_s3_prefix(bucket, prefix) @pytest.mark.slow # 0:04 to run -def test_mnist_local_multi_save_configs(): +def test_mnist_local_multi_save_configs(on_s3=False): + # Runs in 0:04 run_id = 'trial_' + datetime.now().strftime('%Y%m%d-%H%M%S%f') - trial_dir = os.path.join(TORNASOLE_TF_HOOK_TESTS_DIR, run_id) + if on_s3: + bucket = 'tornasole-testing' + prefix = 'tornasole_tf/hooks/estimator_modes/' + run_id + trial_dir = f's3://{bucket}/{prefix}' + else: + trial_dir = os.path.join(TORNASOLE_TF_HOOK_TESTS_DIR, run_id) help_test_mnist(trial_dir, ts.SaveConfig({ ts.modes.TRAIN: ts.SaveConfigMode(save_interval=2), ts.modes.EVAL: ts.SaveConfigMode(save_interval=3) - })) - tr = create_trial(trial_dir) - assert len(tr.available_steps()) == 3 - assert len(tr.available_steps(mode=ts.modes.TRAIN)) == 2 - assert len(tr.available_steps(mode=ts.modes.EVAL)) == 1 - assert len(tr.tensors()) == 17 - shutil.rmtree(trial_dir) + }), num_train_steps=6, num_eval_steps=4) + helper_test_multi_save_configs_trial(trial_dir) @pytest.mark.slow # 0:52 to run def test_mnist_s3_multi_save_configs(): - run_id = 'trial_' + datetime.now().strftime('%Y%m%d-%H%M%S%f') - bucket = 'tornasole-testing' - prefix = 'tornasole_tf/hooks/estimator_modes/' + run_id - trial_dir = f's3://{bucket}/{prefix}' - help_test_mnist(trial_dir, ts.SaveConfig({ - ts.modes.TRAIN: ts.SaveConfigMode(save_interval=2), - ts.modes.EVAL: ts.SaveConfigMode(save_interval=3) - })) - tr = create_trial(trial_dir) - assert len(tr.available_steps()) == 3 - assert len(tr.available_steps(mode=ts.modes.TRAIN)) == 2 - assert len(tr.available_steps(mode=ts.modes.EVAL)) == 1 - assert len(tr.tensors()) == 17 - delete_s3_prefix(bucket, prefix) + # Takes 0:52 to run, compared to 4 seconds above. Speed improvements? + test_mnist_local_multi_save_configs(True) @pytest.mark.slow # 0:02 to run def test_mnist_local_multi_save_configs_json(): @@ -227,10 +230,5 @@ def test_mnist_local_multi_save_configs_json(): shutil.rmtree(out_dir, ignore_errors=True) os.environ[TORNASOLE_CONFIG_FILE_PATH_ENV_STR] = 'tests/tensorflow/hooks/test_json_configs/test_save_config_modes_hook_config.json' hook = ts.TornasoleHook.hook_from_config() - help_test_mnist(out_dir, hook=hook) - tr = create_trial(out_dir) - assert len(tr.available_steps()) == 3 - assert len(tr.available_steps(mode=ts.modes.TRAIN)) == 2 - assert len(tr.available_steps(mode=ts.modes.EVAL)) == 1 - assert len(tr.tensors()) == 17 - shutil.rmtree(out_dir) + help_test_mnist(out_dir, hook=hook, num_train_steps=6, num_eval_steps=4) + helper_test_multi_save_configs_trial(out_dir) diff --git a/tests/tensorflow/hooks/test_losses.py b/tests/tensorflow/hooks/test_losses.py index 29b724824a..fe7f88f040 100644 --- a/tests/tensorflow/hooks/test_losses.py +++ b/tests/tensorflow/hooks/test_losses.py @@ -10,9 +10,11 @@ def test_mnist_local(): run_id = 'trial_' + datetime.now().strftime('%Y%m%d-%H%M%S%f') trial_dir = os.path.join(TORNASOLE_TF_HOOK_TESTS_DIR, run_id) - help_test_mnist(trial_dir, ts.SaveConfig(save_interval=2)) + help_test_mnist(trial_dir, ts.SaveConfig(save_interval=2), + num_train_steps=4, + num_eval_steps=2) tr = create_trial(trial_dir) assert len(tr.collection('losses').get_tensor_names()) == 1 for t in tr.collection('losses').get_tensor_names(): - assert len(tr.tensor(t).steps()) == 4 + assert len(tr.tensor(t).steps()) == 3 shutil.rmtree(trial_dir) diff --git a/tests/tensorflow/hooks/test_reductions.py b/tests/tensorflow/hooks/test_reductions.py index 83acab688f..01cb498265 100644 --- a/tests/tensorflow/hooks/test_reductions.py +++ b/tests/tensorflow/hooks/test_reductions.py @@ -1,6 +1,7 @@ import os import shutil from datetime import datetime + from tornasole.core.reduction_config import ALLOWED_REDUCTIONS, ALLOWED_NORMS from tornasole.core.json_config import TORNASOLE_CONFIG_FILE_PATH_ENV_STR from tornasole.exceptions import * @@ -8,20 +9,22 @@ from .utils import * -def helper_test_reductions(trial_dir, hook): +def helper_test_reductions(trial_dir, hook, save_raw_tensor): simple_model(hook) _, files = get_dirs_files(trial_dir) - coll = ts.get_collections() from tornasole.trials import create_trial tr = create_trial(trial_dir) - assert len(tr.tensors()) == 3 + assert len(tr.tensors()) == 3, tr.tensors() for tname in tr.tensors(): t = tr.tensor(tname) try: - t.value(0) - assert False - except TensorUnavailableForStep: + print(t.value(0)) + if save_raw_tensor is False: + assert False, (tname, t.value(0)) + except TensorUnavailableForStep as e: + if save_raw_tensor is True: + assert False, (t.name, e) pass assert len(t.reduction_values(0)) == 18 for r in ALLOWED_REDUCTIONS + ALLOWED_NORMS: @@ -29,19 +32,22 @@ def helper_test_reductions(trial_dir, hook): assert t.reduction_value(0, reduction_name=r, abs=b, worker=None) is not None -def test_reductions(): +def test_reductions(save_raw_tensor=False): run_id = 'trial_' + datetime.now().strftime('%Y%m%d-%H%M%S%f') trial_dir = os.path.join('/tmp/tornasole_rules_tests/', run_id) pre_test_clean_up() rdnc = ReductionConfig(reductions=ALLOWED_REDUCTIONS, abs_reductions=ALLOWED_REDUCTIONS, norms=ALLOWED_NORMS, - abs_norms=ALLOWED_NORMS) + abs_norms=ALLOWED_NORMS, + save_raw_tensor=save_raw_tensor) hook = TornasoleHook(out_dir=trial_dir, save_config=SaveConfig(save_interval=1), reduction_config=rdnc) - helper_test_reductions(trial_dir, hook) + helper_test_reductions(trial_dir, hook, save_raw_tensor) +def test_reductions_with_raw_tensor(): + test_reductions(save_raw_tensor=True) def test_reductions_json(): trial_dir = "newlogsRunTest1/test_reductions" @@ -50,4 +56,4 @@ def test_reductions_json(): TORNASOLE_CONFIG_FILE_PATH_ENV_STR] = "tests/tensorflow/hooks/test_json_configs/test_reductions.json" pre_test_clean_up() hook = ts.TornasoleHook.hook_from_config() - helper_test_reductions(trial_dir, hook) + helper_test_reductions(trial_dir, hook, False) diff --git a/tests/tensorflow/hooks/test_save_all_full.py b/tests/tensorflow/hooks/test_save_all_full.py index 842f4f7877..33bd831364 100644 --- a/tests/tensorflow/hooks/test_save_all_full.py +++ b/tests/tensorflow/hooks/test_save_all_full.py @@ -24,7 +24,7 @@ def test_save_all_full(hook=None, trial_dir=None): dirs, _ = get_dirs_files(os.path.join(trial_dir, 'events')) coll = get_collections() - assert len(coll) == 6 + assert all([x in coll.keys() for x in ['all','weights','gradients','losses','optimizer_variables']]) assert len(coll['weights'].tensor_names) == 1 assert len(coll['gradients'].tensor_names) == 1 assert len(coll['losses'].tensor_names) == 1 @@ -32,7 +32,7 @@ def test_save_all_full(hook=None, trial_dir=None): assert TORNASOLE_DEFAULT_COLLECTIONS_FILE_NAME in files cm = CollectionManager.load(join(trial_dir, TORNASOLE_DEFAULT_COLLECTIONS_FILE_NAME)) - assert len(cm.collections) == 6 + assert len(cm.collections) == len(coll), (coll, cm.collections) assert len(cm.collections['weights'].tensor_names) == 1 assert len(cm.collections['losses'].tensor_names) == 1 assert len(cm.collections['gradients'].tensor_names) == 1 @@ -43,7 +43,6 @@ def test_save_all_full(hook=None, trial_dir=None): len(cm.collections['gradients'].tensor_names) num_tensors_collection = len(coll['weights'].tensor_names) + \ len(coll['gradients'].tensor_names) - assert num_tensors_collection == num_tensors_loaded_collection assert len(dirs) == 5 for step in dirs: @@ -55,9 +54,10 @@ def test_save_all_full(hook=None, trial_dir=None): for x in fr.read_tensors(): tensor_name, step, tensor_data, mode, mode_step = x i += 1 + print(tensor_name) size += tensor_data.nbytes - assert i == 85 - assert size == 1470 + assert i == 84 + assert size == 1462 if hook_created: shutil.rmtree(trial_dir) diff --git a/tests/tensorflow/hooks/test_save_config.py b/tests/tensorflow/hooks/test_save_config.py index 29312ed7c7..d771aee989 100644 --- a/tests/tensorflow/hooks/test_save_config.py +++ b/tests/tensorflow/hooks/test_save_config.py @@ -90,7 +90,7 @@ def test_save_config_start_and_end_json(): def helper_save_config_modes(trial_dir, hook): - help_test_mnist(trial_dir, hook=hook) + help_test_mnist(trial_dir, hook=hook, num_train_steps=4, num_eval_steps=3) tr = create_trial(trial_dir) for tname in tr.tensors_in_collection('weights'): t = tr.tensor(tname) diff --git a/tests/tensorflow/hooks/test_save_reductions.py b/tests/tensorflow/hooks/test_save_reductions.py index 943edda8b3..0104ea709a 100644 --- a/tests/tensorflow/hooks/test_save_reductions.py +++ b/tests/tensorflow/hooks/test_save_reductions.py @@ -10,13 +10,12 @@ def helper_save_reductions(trial_dir, hook): simple_model(hook) _, files = get_dirs_files(trial_dir) coll = get_collections() - assert len(coll) == 5 assert len(coll['weights'].tensor_names) == 1 assert len(coll['gradients'].tensor_names) == 1 assert TORNASOLE_DEFAULT_COLLECTIONS_FILE_NAME in files cm = CollectionManager.load(join(trial_dir, TORNASOLE_DEFAULT_COLLECTIONS_FILE_NAME)) - assert len(cm.collections) == 5 + assert len(cm.collections) == len(coll) assert len(cm.collections['weights'].tensor_names) == 1 assert len(cm.collections['gradients'].tensor_names) == 1 # as we hadn't asked to be saved @@ -60,8 +59,6 @@ def test_save_reductions(): helper_save_reductions(trial_dir, hook) - - def test_save_reductions_json(): trial_dir = "newlogsRunTest1/test_save_reductions" shutil.rmtree(trial_dir, ignore_errors=True) diff --git a/tests/tensorflow/hooks/test_training_end.py b/tests/tensorflow/hooks/test_training_end.py index 5b63fd34bf..329e810378 100644 --- a/tests/tensorflow/hooks/test_training_end.py +++ b/tests/tensorflow/hooks/test_training_end.py @@ -18,6 +18,6 @@ def test_training_job_has_ended(): [sys.executable, "examples/tensorflow/scripts/simple.py", "--tornasole_path", trial_dir, '--steps', '10', '--tornasole_frequency', '5'], - env={'CUDA_VISIBLE_DEVICES':'-1'}) + env={'CUDA_VISIBLE_DEVICES':'-1', 'TORNASOLE_LOG_LEVEL': 'debug'}) assert has_training_ended(trial_dir) == True shutil.rmtree(trial_dir) diff --git a/tests/tensorflow/hooks/test_write.py b/tests/tensorflow/hooks/test_write.py index 074d7d7684..5bc5020877 100644 --- a/tests/tensorflow/hooks/test_write.py +++ b/tests/tensorflow/hooks/test_write.py @@ -6,7 +6,7 @@ from tornasole.tensorflow import reset_collections from .utils import * from tornasole.core.reader import FileReader -from tornasole.core.locations import EventFileLocation +from tornasole.core.locations import TensorFileLocation from tornasole.core.json_config import TORNASOLE_CONFIG_FILE_PATH_ENV_STR import tornasole.tensorflow as ts @@ -65,7 +65,7 @@ def helper_tornasole_hook_write(data_dir, hook): # read saved weights from disk using summary iterator, verify if in-memory weights at end of training # are identical to the ones we have saved using TornasoleHook - step_dir = EventFileLocation.get_step_dir_path(data_dir, 999) + step_dir = TensorFileLocation.get_step_dir_path(data_dir, 999) files = os.listdir(step_dir) print(v.keys()) for f in files: diff --git a/tornasole/core/collection.py b/tornasole/core/collection.py index 34a428ac4d..1f018f86c8 100644 --- a/tornasole/core/collection.py +++ b/tornasole/core/collection.py @@ -6,7 +6,7 @@ from typing import Any, Dict, List, Optional, Union ALLOWED_PARAMS = ['name', 'include_regex', 'reduction_config', 'save_config', - 'tensor_names'] + 'tensor_names', 'save_histogram'] class CollectionKeys: DEFAULT = 'default' @@ -19,8 +19,6 @@ class CollectionKeys: SCALARS = 'scalars' TENSORFLOW_SUMMARIES = 'tensorflow_summaries' - SCALAR_SUMMARIES = 'scalar_summaries' - HISTOGRAMS = 'histograms' #XGBOOST METRIC = "metric" @@ -29,6 +27,18 @@ class CollectionKeys: FEATURE_IMPORTANCE = "feature_importance" AVERAGE_SHAP = "average_shap" +# Collection with summary objects instead of tensors +# so we don't create summaries or reductions of these +SUMMARIES_COLLECTIONS = { + CollectionKeys.TENSORFLOW_SUMMARIES +} + + +NON_HISTOGRAM_COLLECTIONS = { + CollectionKeys.LOSSES, CollectionKeys.SCALARS, + CollectionKeys.TENSORFLOW_SUMMARIES +} + class Collection: """ @@ -56,11 +66,12 @@ class Collection: if this is not passed, uses the default save_config """ def __init__(self, name, include_regex=None, tensor_names=None, - reduction_config=None, save_config=None): + reduction_config=None, save_config=None, save_histogram=True): self.name = name self.include_regex = include_regex if include_regex is not None else [] self.set_reduction_config(reduction_config) self.set_save_config(save_config) + self.save_histogram = save_histogram # todo: below comment is broken now that we have set. do we need it back? # we want to maintain order here so that different collections can be analyzed together @@ -129,7 +140,8 @@ def to_json_dict(self) -> Dict: "include_regex": self.include_regex, "tensor_names": sorted(list(self.tensor_names)) if self.tensor_names else [], # Sort for determinism "reduction_config": self.reduction_config.to_json_dict() if self.reduction_config else None, - "save_config": self.save_config.to_json_dict() if self.save_config else None + "save_config": self.save_config.to_json_dict() if self.save_config else None, + "save_histogram": self.save_histogram } def to_json(self) -> str: @@ -145,7 +157,8 @@ def from_dict(cls, params: Dict) -> 'Collection': "include_regex": params.get("include_regex", False), "tensor_names": set(params.get("tensor_names", [])), "reduction_config": ReductionConfig.from_dict(params["reduction_config"]) if "reduction_config" in params else None, - "save_config": SaveConfig.from_dict(params["save_config"]) if "save_config" in params else None + "save_config": SaveConfig.from_dict(params["save_config"]) if "save_config" in params else None, + "save_histogram": params.get("save_histogram", True) } return cls(**res) @@ -159,6 +172,11 @@ def __str__(self): def __hash__(self): return hash(self.name) + def __repr__(self): + return ( + f"" + ) + def __eq__(self, other): if not isinstance(other, Collection): return NotImplemented @@ -167,5 +185,5 @@ def __eq__(self, other): self.include_regex == other.include_regex and \ self.tensor_names == other.tensor_names and \ self.reduction_config == other.reduction_config and \ - self.save_config == other.save_config - + self.save_config == other.save_config and \ + self.save_histogram == other.save_histogram diff --git a/tornasole/core/hook.py b/tornasole/core/hook.py index 057be15061..63a052e1c6 100644 --- a/tornasole/core/hook.py +++ b/tornasole/core/hook.py @@ -2,12 +2,13 @@ from abc import ABCMeta, abstractmethod import re as _re import os +import numpy as np from typing import Optional, List, Union, Tuple, Dict, Set -from tornasole.core.utils import match_inc +from tornasole.core.utils import match_inc, size_and_shape from tornasole.core.reduction_config import ReductionConfig +from tornasole.core.collection import CollectionKeys, SUMMARIES_COLLECTIONS, NON_HISTOGRAM_COLLECTIONS from tornasole.core.collection_manager import CollectionManager -from tornasole.core.collection import CollectionKeys from tornasole.core.save_config import SaveConfig, SaveConfigMode from tornasole.core.access_layer import training_has_ended from tornasole.core.hook_utils import verify_and_get_out_dir @@ -84,6 +85,8 @@ def __init__(self, self.save_all = save_all self.save_config = SaveConfig.parse(save_config) + if reduction_config is None: + reduction_config = ReductionConfig(save_raw_tensor=True) self.reduction_config = reduction_config self.include_regex = include_regex self.collection_manager = collection_manager @@ -108,15 +111,15 @@ def __init__(self, self.logger.warn('The `default` collection was not passed to ' 'include_collections. So it is not being saved') - self.prepared_collections = False self._collections_to_save = set() - # todo clear cache for old steps - self.save_states_cache = {} + self._collections_to_save_for_step = None + self.prepared_collections = False self.tensor_to_collections = {} self.step = init_step self.mode = ModeKeys.GLOBAL self.mode_steps = {ModeKeys.GLOBAL: init_step} self.writer = None + self.tb_writers = {} self.logger.info('Saving to {}'.format(self.out_dir)) atexit.register(self._cleanup) @@ -141,16 +144,16 @@ def _get_all_collections_to_save(self) -> Set['Collection']: self._assert_prep() return self._collections_to_save - def _get_collections_to_save_for_step(self, mode, step) -> Set['Collection']: - """Mark the proper collections to be saved, return a set of those.""" - self._assert_prep() - if (mode, step) not in self.save_states_cache: - coll_to_save_for_step = set() + def _get_collections_to_save_for_step(self) -> Set['Collection']: + if self._collections_to_save_for_step is None: + self._assert_prep() + s = set() for coll in self._collections_to_save: - if coll.get_save_config().should_save_step(mode, step): - coll_to_save_for_step.add(coll) - self.save_states_cache[(mode, step)] = coll_to_save_for_step - return self.save_states_cache[(mode, step)] + if coll.get_save_config().should_save_step( + self.mode, self.mode_steps[self.mode]): + s.add(coll) + self._collections_to_save_for_step = s + return self._collections_to_save_for_step def _get_collections_with_tensor(self, tensor_name) -> Set['Collection']: self._assert_prep() @@ -169,13 +172,12 @@ def _get_collections_with_tensor(self, tensor_name) -> Set['Collection']: self.tensor_to_collections[tensor_name] = matched_colls return self.tensor_to_collections[tensor_name] - def _should_save_tensor_for_step(self, tensorname, mode, step) -> bool: + def _should_save_tensor_for_step(self, tensorname) -> bool: """Returns whether tensorname should be saved for this mode, mode_step as a bool """ - colls_to_save = self._get_collections_to_save_for_step(mode, step) for coll in self._get_collections_with_tensor(tensorname): - if coll in colls_to_save: + if coll in self._get_collections_to_save_for_step(): return True return False @@ -189,6 +191,10 @@ def _prepare_collections(self): # Populate configs_for_collections and reduction_config for c_name, c in self.collection_manager.get_collections().items(): + + if c_name in NON_HISTOGRAM_COLLECTIONS: + c.save_histogram = False + if c.save_config is None: # Set to the default if None c.save_config = self.save_config @@ -199,33 +205,78 @@ def _prepare_collections(self): raise TypeError( f"save_config={c.save_config} must be None or SaveConfig") - if c.get_reduction_config() is None and self.reduction_config is not None: + if c_name in SUMMARIES_COLLECTIONS: + c.set_reduction_config(ReductionConfig(save_raw_tensor=True)) + elif c.get_reduction_config() is None: c.set_reduction_config(self.reduction_config) + self.prepared_collections = True #### End of Save Manager methods #### - def _flush_and_close_writer(self) -> None: + def _close_writer(self) -> None: if self.dry_run: return + if self.writer is not None: self.writer.flush() self.writer.close() self.writer = None + def _close_writers(self) -> None: + if self.dry_run: + return + + self._close_writer() + to_delete_writers = [] + for mode, writer in self.tb_writers.items(): + if writer is not None: + writer.flush() + writer.close() + to_delete_writers.append(mode) + for mode in to_delete_writers: + del self.tb_writers[mode] + def _initialize_writer(self) -> None: if self.dry_run: return self.writer = FileWriter( trial_dir=self.out_dir, step=self.step, worker=self.worker) + def _get_tb_writer(self): + if self.mode in self.tb_writers: + assert self.tb_writers[self.mode] is not None + # would be there if set_mode was called + return self.tb_writers[self.mode] + else: + # s = self.step + # if s < 0: s = 0 + self.tb_writers[self.mode] = FileWriter( + trial_dir=self.out_dir, + step=self.step, + worker=self.worker, + write_checksum=True, + wtype='tensorboard', + mode=self.mode + ) + return self.tb_writers[self.mode] + + def _close_tb_writer(self): + if self.dry_run: + return + + if self.mode in self.tb_writers: + self.tb_writers[self.mode].close() + del self.tb_writers[self.mode] + def _cleanup(self): - self._flush_and_close_writer() + self._close_writers() training_has_ended(self.out_dir) def _increment_step(self): self.step += 1 self.mode_steps[self.mode] += 1 + self._collections_to_save_for_step = None def set_mode(self, mode): # train @@ -238,11 +289,167 @@ def set_mode(self, mode): if mode not in self.mode_steps: self.mode_steps[mode] = self.init_step + self._collections_to_save_for_step = None + def export_collections(self): collection_file_name = f'{self.worker}_collections.json' self.collection_manager.export(os.path.join( self.out_dir, collection_file_name)) + def _write_reduction(self, tensor_name, tensor_value, reduction_name, abs): + reduction_tensor_name = get_reduction_tensor_name( + tensor_name, reduction_name, abs) + tensor_data = self._get_reduction_of_data( + reduction_name, tensor_value, tensor_name, abs) + self._write_raw_tensor_simple(reduction_tensor_name, tensor_data) + + def _write_reductions(self, tensor_name, tensor_value, save_collections): + reductions_saved = set() + for s_col in save_collections: + reduction_config = s_col.get_reduction_config() + for reduction_list in (reduction_config.reductions, + reduction_config.norms): + for reduction in reduction_list: + if (reduction, abs) not in reductions_saved: + self._write_reduction( + tensor_name, tensor_value, reduction, abs=False) + reductions_saved.add((reduction, False)) + for reduction_list in (reduction_config.abs_reductions, + reduction_config.abs_norms): + for reduction in reduction_list: + if (reduction, abs) not in reductions_saved: + self._write_reduction( + tensor_name, tensor_value, reduction, abs=True) + reductions_saved.add((reduction, True)) + + def _write_scalar_summary(self, tensor_name, tensor_value, save_colls): + for s_col in save_colls: + if s_col.name in [CollectionKeys.LOSSES, CollectionKeys.SCALARS]: + np_val = self._make_numpy_array(tensor_value) + if self.dry_run: + return + + if np_val.squeeze().ndim == 0: + self._get_tb_writer().write_scalar_summary( + tensor_name, np_val, self.step) + else: + self.logger.debug( + f'Value of {tensor_name} is not scalar, ' + f'so scalar summary could not be created') + break + + def _write_histogram_summary(self, tensor_name, tensor_value, save_collections): + for s_col in save_collections: + if s_col.name in SUMMARIES_COLLECTIONS: + continue + elif s_col.save_histogram is True: + np_value = self._make_numpy_array(tensor_value) + if self.dry_run or np_value.dtype == np.bool or np_value.nbytes == 0: + return + + hist_name = f'histograms/{s_col.name}/{tensor_name}' + self._get_tb_writer().write_histogram_summary( + tdata=np_value, + tname=hist_name, + global_step=self.step) + break + + # Fix step number for saving scalar and tensor + # def save_scalar(self, name, value): + # get_collection(CollectionKeys.SCALARS).add_tensor_name(name) + # if self.writer is None: + # self._init_writer() + # val = make_numpy_array(value) + # if val.size != 1: + # raise TypeError( + # f'{name} has non scalar value of type: {type(value)}') + # self._save_scalar_summary(name, val) + # logger.debug(f'Saving scalar {name} {val} for step {self.step} {self.mode} {self.mode_steps[self.mode]}') + # self._save_raw_tensor(name, val) + + # def save_tensor(self, name, value): + # # todo: support to add these tensors to any collection. + # # complication here is that we need to export the file again + # # todo: what happens if name is conflicting + # if self.writer is None: + # self._init_writer() + # self._save_raw_tensor(name, value) + + def _write_raw_tensor(self, tensor_name, tensor_value, save_collections): + for s_col in save_collections: + reduction_config = s_col.get_reduction_config() + if reduction_config.save_raw_tensor is True: + self._write_raw_tensor_simple(tensor_name, tensor_value) + break + + def _write_raw_tensor_simple(self, tensor_name, tensor_value): + # todo: if fp16, check perf of saving as fp16 in proto vs as fp32 + numpy_tensor_value = self._make_numpy_array(tensor_value) + this_size, this_shape = size_and_shape(numpy_tensor_value) + if self.dry_run is False and this_size > 0: + self.writer.write_tensor( + tdata=numpy_tensor_value, tname=tensor_name, mode=self.mode, + mode_step=self.mode_steps[self.mode]) + + def _save_for_tensor(self, tensor_name, tensor_value, + check_before_write=True): + if check_before_write and \ + self._should_save_tensor_for_step(tensorname=tensor_name) is False: + return + self.logger.debug(f'Processing {tensor_name} for global step {self.step}') + + save_collections = self._get_collections_with_tensor(tensor_name) + save_collections_for_tensor = save_collections.intersection( + self._get_collections_to_save_for_step()) + self._write_for_tensor(tensor_name, tensor_value, save_collections_for_tensor) + + def _write_for_tensor(self, tensor_name, tensor_value, save_collections): + """ + Write all data that we might want to for this tensor + :param tensor_name: name of tensor + :param tensor_value: value (could be in framework tensor dtype) + :param save_collections: list of collections which are being saved for this step + """ + # write reductions defined for collections this tensor may be part of + self._write_reductions(tensor_name, tensor_value, save_collections) + + # write histogram for this tensor if any collection this tensor + # is part of has save_histogram as True + self._write_histogram_summary(tensor_name, tensor_value, save_collections) + + # write raw tensor if save_raw_tensor in reduction config is True + self._write_raw_tensor(tensor_name, tensor_value, save_collections) + + # writes scalar summary if this value is a scalar (or 1x1 array) + self._write_scalar_summary(tensor_name, tensor_value, save_collections) + + @staticmethod + @abstractmethod + def _get_reduction_of_data(reduction_name, tensor_value, tensor_name, abs): + """ + Returns the reduction of given tensor + :param reduction_name: str + type of reduction + :param tensor_value: tensor_data_type + reduction to be performed on this original tensor value + :param tensor_name: str + name of original tensor + :param abs: bool + whether to take absolute value of tensor before performing reduction + :return: + """ + pass + + @staticmethod + @abstractmethod + def _make_numpy_array(tensor_value): + """ + Convert the tensor value into a numpy array + :param tensor_value: mx.nd.NDArray, torch.Tensor, etc + :return: numpy ndarray + """ + pass + class CallbackHook(BaseHook): __metaclass__ = ABCMeta @@ -275,8 +482,6 @@ def __init__(self, self.last_saved_step = None self.exported_collections = False self.data_type_name = data_type_name - # collections that need to be saved in a particular step. - self.collections_in_this_step = None def _cleanup(self): if not self.exported_collections: @@ -284,19 +489,13 @@ def _cleanup(self): self.exported_collections = True super()._cleanup() - def _process_step(self) -> Set['Collection']: - # returns set of collections which need to be saved for step - self.collections_in_this_step = self._get_collections_to_save_for_step( - self.mode, self.mode_steps[self.mode]) - return self.collections_in_this_step - def _write(self, module_name, var, suffix, idx): if self.data_type_name is None: raise RuntimeError( "This method can not be called when data_type is None") if var.__class__.__name__ == self.data_type_name: - self._write_tensor(module_name + suffix + str(idx), var) + self._save_for_tensor(module_name + suffix + str(idx), var) return idx + 1 elif isinstance(var, tuple) or isinstance(var, list): for val in var: @@ -313,87 +512,6 @@ def _write_inputs(self, name, inputs): def _write_outputs(self, name, outputs): self._write(name, outputs, CallbackHook.OUTPUT_TENSOR_SUFFIX, idx=0) - def _write_reduction(self, tensor_name, tensor_value, reduction_name, abs): - reduction_tensor_name = get_reduction_tensor_name( - tensor_name, reduction_name, abs) - tensor_data = self._get_reduction_of_data( - reduction_name, tensor_value, tensor_name, abs) - tensor_value_np = self._make_numpy_array(tensor_data) - self.writer.write_tensor(tdata=tensor_value_np, - tname=reduction_tensor_name, - mode=self.mode, - mode_step=self.mode_steps[self.mode]) - - def _write_reductions(self, tensor_name, tensor_value, reduction_config): - for reduction_list in (reduction_config.reductions, - reduction_config.norms): - for reduction in reduction_list: - self._write_reduction( - tensor_name, tensor_value, reduction, abs=False) - for reduction_list in (reduction_config.abs_reductions, - reduction_config.abs_norms): - for reduction in reduction_list: - self._write_reduction( - tensor_name, tensor_value, reduction, abs=True) - - @staticmethod @abstractmethod - def _get_reduction_of_data(reduction_name, tensor_value, tensor_name, abs): - """ - Returns the reduction of given tensor - :param reduction_name: str - type of reduction - :param tensor_value: tensor_data_type - reduction to be performed on this original tensor value - :param tensor_name: str - name of original tensor - :param abs: bool - whether to take absolute value of tensor before performing reduction - :return: - """ - pass - - @staticmethod - @abstractmethod - def _make_numpy_array(tensor_value): - """ - Convert the tensor value into a numpy array - :param tensor_value: mx.nd.NDArray, torch.Tensor, etc - :return: numpy ndarray - """ + def _export_model(self): pass - - def _write_tensor(self, tensor_name, tensor_value): - if self.dry_run or \ - self._should_save_tensor_for_step( - tensorname=tensor_name, mode=self.mode, - step=self.mode_steps[self.mode]) is False: - return - - save_collections = self._get_collections_with_tensor(tensor_name) - for save_collection in save_collections: - if save_collection in self.collections_in_this_step: - reduction_config = save_collection.get_reduction_config() - if reduction_config is not None: - self._write_reductions( - tensor_name, tensor_value, reduction_config) - else: - tensor_value = self._make_numpy_array(tensor_value) - self.writer.write_tensor( - tdata=tensor_value, - tname=tensor_name, - mode=self.mode, - mode_step=self.mode_steps[self.mode]) - - - @staticmethod - def clean_tag(name): - if name is not None: - new_name = CallbackHook.INVALID_TAG_CHARACTERS.sub('_', name) - new_name = new_name.lstrip('/') # Remove leading slashes - if new_name != name: - logger.warning( - 'Summary name %s is illegal; using %s instead.', name, - new_name) - name = new_name - return name diff --git a/tornasole/core/index_reader.py b/tornasole/core/index_reader.py index 23a547de36..cfba2e080c 100644 --- a/tornasole/core/index_reader.py +++ b/tornasole/core/index_reader.py @@ -38,6 +38,7 @@ def get_s3_responses(bucket_name, prefix_name, start_after_key, range_steps=None workers = [] index_files, last_index_token = S3IndexReader.list_all_index_files_from_s3(bucket_name, prefix_name, start_after_key) + logger.debug(','.join(index_files)) for index_file in index_files: step = IndexFileLocationUtils.parse_step_from_index_file_name(index_file) if (range_steps is not None and step_in_range(range_steps, step)) or \ @@ -51,9 +52,9 @@ def get_s3_responses(bucket_name, prefix_name, start_after_key, range_steps=None @staticmethod def list_all_index_files_from_s3(bucket_name, prefix_name, start_after_key=None): - index_files, last_index_token = list_s3_objects(bucket_name, - IndexFileLocationUtils.get_index_path(prefix_name), - start_after_key) + index_files, last_index_token = list_s3_objects( + bucket_name, IndexFileLocationUtils.get_index_path(prefix_name), + start_after_key) return index_files, last_index_token diff --git a/tornasole/core/locations.py b/tornasole/core/locations.py index 19b4816d43..efc6b046b8 100644 --- a/tornasole/core/locations.py +++ b/tornasole/core/locations.py @@ -1,9 +1,10 @@ import os import re - +from abc import ABC, abstractmethod from .utils import get_immediate_subdirectories from .logger import get_logger + logger = get_logger() @@ -28,50 +29,90 @@ def to_dict(self): STEP_NUMBER_FORMATTING_LENGTH = '012' -class EventFileLocation: - def __init__(self, step_num, worker_name, type='events'): +class EventFileLocation(ABC): + def __init__(self, step_num, worker_name): self.step_num = int(step_num) self.worker_name = worker_name - self.type = type + self.type = None - def get_location(self, trial_dir=''): - step_num_str = str(format(self.step_num, STEP_NUMBER_FORMATTING_LENGTH)) + def get_step_num_str(self): + return str(format(self.step_num, STEP_NUMBER_FORMATTING_LENGTH)) + + def get_filename(self): + step_num_str = self.get_step_num_str() event_filename = f"{step_num_str}_{self.worker_name}.tfevents" - if trial_dir: - event_key_prefix = os.path.join(trial_dir, self.type) - else: - event_key_prefix = self.type - return os.path.join(event_key_prefix, step_num_str, event_filename) + return event_filename - @staticmethod - def match_regex(s): - return EventFileLocation.load_filename(s, print_error=False) + @classmethod + def match_regex(cls, s): + return cls.load_filename(s, print_error=False) - @staticmethod - def load_filename(s, print_error=True): + @classmethod + def load_filename(cls, s, print_error=True): event_file_name = os.path.basename(s) m = re.search('(.*)_(.*).tfevents$', event_file_name) if m: step_num = int(m.group(1)) worker_name = m.group(2) - return EventFileLocation(step_num=step_num, worker_name=worker_name) + return cls(step_num=step_num, worker_name=worker_name) else: if print_error: logger.error('Failed to load efl: ', s) return None @staticmethod - def get_step_dirs(trial_dir): - return get_immediate_subdirectories(os.path.join(trial_dir, - 'events')) + @abstractmethod + def get_dir(trial_dir): + pass + + +class TensorFileLocation(EventFileLocation): + def __init__(self, step_num, worker_name): + super().__init__(step_num, worker_name) + self.type = 'events' @staticmethod - def get_step_dir_path(trial_dir, step_num): + def get_dir(trial_dir): + return os.path.join(trial_dir, 'events') + + def get_file_location(self, trial_dir=''): + if trial_dir: + event_key_prefix = self.get_dir(trial_dir) + else: + event_key_prefix = self.type + return os.path.join( + event_key_prefix, self.get_step_num_str(), self.get_filename()) + + @classmethod + def get_step_dirs(cls, trial_dir): + return get_immediate_subdirectories(cls.get_dir(trial_dir)) + + @classmethod + def get_step_dir_path(cls, trial_dir, step_num): step_num = int(step_num) - return os.path.join(trial_dir, 'events', + return os.path.join(cls.get_dir(trial_dir), format(step_num, STEP_NUMBER_FORMATTING_LENGTH)) +class TensorboardFileLocation(EventFileLocation): + def __init__(self, step_num, worker_name, mode=None): + super().__init__(step_num, worker_name) + self.mode = mode + self.type = 'tensorboard' + + @staticmethod + def get_dir(trial_dir): + return os.path.join(trial_dir, 'tensorboard') + + def get_file_location(self, trial_dir=''): + if trial_dir: + event_key_prefix = os.path.join(self.get_dir(trial_dir), self.mode.name) + else: + event_key_prefix = os.path.join(self.type, self.mode.name) + + return os.path.join(event_key_prefix, self.get_filename()) + + class IndexFileLocationUtils: # These functions are common to index reader and index writer MAX_INDEX_FILE_NUM_IN_INDEX_PREFIX = 1000 @@ -87,21 +128,20 @@ def next_index_prefix_for_step(step_num): return format(index_prefix_for_step + 1, '09') @staticmethod - def indexS3Key(trial_prefix, index_prefix_for_step_str, step_num, worker_name): + def _get_index_key(trial_prefix, step_num, worker_name): + index_prefix_for_step_str = IndexFileLocationUtils.\ + get_index_prefix_for_step(step_num) step_num_str = format(step_num, '012') index_filename = format(f"{step_num_str}_{worker_name}.json") - index_key = format(f"{trial_prefix}/index/{index_prefix_for_step_str}/{index_filename}") + index_key = format( + f"{trial_prefix}/index/{index_prefix_for_step_str}/{index_filename}") return index_key - # for a step_num index files lies in prefix step_num/MAX_INDEX_FILE_NUM_IN_INDEX_PREFIX + # for a step_num index files lies + # in prefix step_num/MAX_INDEX_FILE_NUM_IN_INDEX_PREFIX @staticmethod def get_index_key_for_step(trial_prefix, step_num, worker_name): - index_prefix_for_step_str = IndexFileLocationUtils.get_index_prefix_for_step(step_num) - return IndexFileLocationUtils.indexS3Key(trial_prefix, index_prefix_for_step_str, step_num, worker_name) - # let's assume worker_name is given by hook - # We need to think on naming conventions and access patterns for: - # 1) muti-node training --> data parallel - # 2) multi gpu training --> model parallel + return IndexFileLocationUtils._get_index_key(trial_prefix, step_num, worker_name) @staticmethod def get_step_from_idx_filename(index_file_name): @@ -113,6 +153,7 @@ def get_step_from_idx_filename(index_file_name): @staticmethod def parse_step_from_index_file_name(index_file_name): # 10 = prefix/index/000000000/000000000010_worker.json' + # 10 = prefix/index/000000000/000000000010_worker_EVAL.json' base_file_name = os.path.basename(index_file_name) step = int(base_file_name.split('_')[0]) return step @@ -120,4 +161,3 @@ def parse_step_from_index_file_name(index_file_name): @staticmethod def get_index_path(path): return os.path.join(path, 'index') - diff --git a/tornasole/core/reader.py b/tornasole/core/reader.py index 37cc3f89ba..e8884cd066 100644 --- a/tornasole/core/reader.py +++ b/tornasole/core/reader.py @@ -20,7 +20,7 @@ from tornasole.core.tfevent.event_file_reader import EventFileReader -class FileReader(): +class FileReader: def __init__(self, fname, wtype='tfevent'): """Creates a `FileWriter` and an file. On construction the summary writer creates a new event file in `logdir`. @@ -49,8 +49,8 @@ def __exit__(self, exc_type, exc_value, traceback): """Make usable with "with" statement.""" self._reader.__exit__(exc_type, exc_value, traceback) - def read_tensors(self, check=False): + def read_tensors(self, check='minimal'): + if check is True: + check = 'minimal' return self._reader.read_tensors(check=check) - #def __del__(self): - # self._reader.__del__() diff --git a/tornasole/core/s3_utils.py b/tornasole/core/s3_utils.py index b40e867808..e8e32ee73e 100644 --- a/tornasole/core/s3_utils.py +++ b/tornasole/core/s3_utils.py @@ -17,7 +17,6 @@ def list_s3_objects(bucket, prefix, start_after_key=None, delimiter=""): last_token = None if start_after_key is None: start_after_key = prefix - logger.debug(f'Trying to load index files after {start_after_key}') req = ListRequest(Bucket=bucket, Prefix=prefix, StartAfter=start_after_key, diff --git a/tornasole/core/tensor.py b/tornasole/core/tensor.py index 3d7a0df49c..268b776816 100644 --- a/tornasole/core/tensor.py +++ b/tornasole/core/tensor.py @@ -1,13 +1,14 @@ -from .reductions import get_numpy_reduction -from tornasole.core.modes import ModeKeys -from tornasole.exceptions import * -from tornasole.core.index_reader import IndexReader -from tornasole.core.locations import TensorLocation - -from enum import Enum import bisect +from enum import Enum +from typing import Dict, Tuple + import numpy as np -from typing import Any, Dict, List, Tuple + +from tornasole.exceptions import * +from .locations import TensorLocation +from .index_reader import IndexReader +from .modes import ModeKeys +from .reductions import get_numpy_reduction class StepState(Enum): diff --git a/tornasole/core/tfevent/event_file_reader.py b/tornasole/core/tfevent/event_file_reader.py index 5a328d808b..d17340798a 100644 --- a/tornasole/core/tfevent/event_file_reader.py +++ b/tornasole/core/tfevent/event_file_reader.py @@ -18,7 +18,6 @@ """Reads events from disk.""" import tornasole.core.tfevent.proto.types_pb2 as types_pb2 -import logging import numpy as np from .proto.event_pb2 import Event @@ -27,9 +26,6 @@ from tornasole.core.modes import ModeKeys, MODE_STEP_PLUGIN_NAME, MODE_PLUGIN_NAME -#todo: remove this logger perhaps -logging.basicConfig() - def as_dtype(t): _INTERN_TABLE = { types_pb2.DT_HALF: np.float16, @@ -43,7 +39,6 @@ def as_dtype(t): return _INTERN_TABLE[t] - def get_tensor_data(tensor): shape = [d.size for d in tensor.tensor_shape.dim] # num_elements = np.prod(shape, dtype=np.int64) @@ -54,7 +49,7 @@ def get_tensor_data(tensor): r = tensor.string_val[i] r = r.decode('utf-8') res.append(r) - return res + return np.array(res) dtype = as_dtype(tensor.dtype) #dtype = tensor_dtype.as_numpy_dtype @@ -88,17 +83,13 @@ class EventsReader(object): EventsReader defined in https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/util/events_writer.cc""" def __init__(self, filename): - """ - Events files have a name of the form - '/file/path/events.out.tfevents.[timestamp].[hostname][file_suffix]' - """ self._filename = filename self._tfrecord_reader = RecordReader(self._filename) def __exit__(self, exc_type, exc_value, traceback): self._tfrecord_reader.__exit__(exc_type, exc_value, traceback) - def read_events(self, check=True): + def read_events(self, check='minimal'): while self._tfrecord_reader.has_data(): rec = self._tfrecord_reader.read_record(check=check) event = Event() @@ -130,32 +121,33 @@ def __init__(self, fname): def __exit__(self,exc_type, exc_value, traceback): self._ev_reader.__exit__(exc_type, exc_value, traceback) - def read_tensors(self, check=False): + def _get_mode_modestep(self, step, plugin_data): + mode_step = step + mode = ModeKeys.GLOBAL + for metadata in plugin_data: + if metadata.plugin_name == MODE_STEP_PLUGIN_NAME: + mode_step = int(metadata.content) + if metadata.plugin_name == MODE_PLUGIN_NAME: + mode = ModeKeys(int(metadata.content)) + return mode, mode_step + + def read_tensors(self, check='minimal'): for step, summ in self.read_summaries(check=check): for v in summ.value: - assert v.WhichOneof('value') == 'tensor' + val = v.WhichOneof('value') + assert val == 'tensor' tensor_name = v.tag # We have found the right tensor at the right step tensor_data = get_tensor_data(v.tensor) - - # default values - # todo: validate the logic extensively - mode_step = step - mode = ModeKeys.GLOBAL - for metadata in v.metadata.plugin_data: - if metadata.plugin_name == MODE_STEP_PLUGIN_NAME: - mode_step = int(metadata.content) - if metadata.plugin_name == MODE_PLUGIN_NAME: - mode = ModeKeys(int(metadata.content)) + mode, mode_step = self._get_mode_modestep( + step, v.metadata.plugin_data) yield (tensor_name, step, tensor_data, mode, mode_step) - def read_summaries(self, check=True): + def read_summaries(self, check='minimal'): for ev in self.read_events(check=check): - #assert ev.HasField('step') - if not ev.HasField('summary'): - continue - assert ev.HasField('summary') - yield (ev.step, ev.summary) + # graph gets bypassed here + if ev.HasField('summary'): + yield (ev.step, ev.summary) - def read_events(self,check=True): + def read_events(self, check='minimal'): return self._ev_reader.read_events(check=check) diff --git a/tornasole/core/tfevent/event_file_writer.py b/tornasole/core/tfevent/event_file_writer.py index 9d4b0a42d8..4130cfff62 100644 --- a/tornasole/core/tfevent/event_file_writer.py +++ b/tornasole/core/tfevent/event_file_writer.py @@ -17,22 +17,14 @@ """Writes events to disk in a trial dir.""" -import numpy as np -import os.path -import socket import threading import time import six - from tornasole.core.locations import TensorLocation -from .events_writer import EventsWriter -from .proto.event_pb2 import Event -from .proto.summary_pb2 import Summary, SummaryMetadata -from .util import make_tensor_proto - +from tornasole.core.tfevent.events_writer import EventsWriter +from tornasole.core.tfevent.proto.event_pb2 import Event from tornasole.core.tfevent.index_file_writer import EventWithIndex from tornasole.core.utils import get_relative_event_file_path -from tornasole.core.modes import MODE_STEP_PLUGIN_NAME, MODE_PLUGIN_NAME from tornasole.core.utils import parse_worker_name_from_file @@ -42,18 +34,6 @@ def size_and_shape(t): return (t.nbytes, t.shape) -def make_numpy_array(x): - if isinstance(x, np.ndarray): - return x - elif np.isscalar(x): - return np.array([x]) - elif isinstance(x, tuple): - return np.asarray(x, dtype=x.dtype) - else: - raise TypeError('_make_numpy_array only accepts input types of numpy.ndarray, scalar,' - ' while received type {}'.format(str(type(x)))) - - def _get_sentinel_event(): """Generate a sentinel event for terminating worker.""" return Event() @@ -95,26 +75,10 @@ def __init__(self, path, max_queue=10, flush_secs=120, ) self._worker.start() - def write_tensor(self, tdata, tname, write_index, - global_step, mode, mode_step): - sm1 = SummaryMetadata.PluginData(plugin_name='tensor') - sm2 = SummaryMetadata.PluginData(plugin_name=MODE_STEP_PLUGIN_NAME, - content=str(mode_step)) - sm3 = SummaryMetadata.PluginData(plugin_name=MODE_PLUGIN_NAME, - content=str(mode.value)) - plugin_data = [sm1, sm2, sm3] - smd = SummaryMetadata(plugin_data=plugin_data) - - value = make_numpy_array(tdata) - tag = tname - tensor_proto = make_tensor_proto(nparray_data=value, tag=tag) - s = Summary(value=[Summary.Value(tag=tag, metadata=smd, - tensor=tensor_proto)]) - if write_index: - self.write_summary_with_index( - s, global_step, tname, mode, mode_step) - else: - self.write_summary(s, global_step) + def write_graph(self, graph): + """Adds a `Graph` protocol buffer to the event file.""" + event = Event(graph_def=graph.SerializeToString()) + self.write_event(event) def write_summary(self, summary, step): event = Event(summary=summary) @@ -172,7 +136,7 @@ def run(self): event_in_queue = self._queue.get() if isinstance(event_in_queue, EventWithIndex): - # checking whether there is an object of IndexArgs, + # checking whether there is an object of EventWithIndex, # which is written by write_summary_with_index event = event_in_queue.event else: @@ -188,14 +152,15 @@ def run(self): # write index if isinstance(event_in_queue, EventWithIndex): eventfile = self._ev_writer.name() - tname = event_in_queue.tensorname - mode = event_in_queue.get_mode() - mode_step = event_in_queue.mode_step eventfile = get_relative_event_file_path(eventfile) tensorlocation = TensorLocation( - tname, mode, mode_step, eventfile, - positions[0], positions[1], parse_worker_name_from_file(eventfile) - ) + tname=event_in_queue.tensorname, + mode=event_in_queue.get_mode(), + mode_step=event_in_queue.mode_step, + event_file_name=eventfile, + start_idx=positions[0], + length=positions[1], + worker=parse_worker_name_from_file(eventfile)) self._ev_writer.index_writer.add_index(tensorlocation) # Flush the event writer every so often. now = time.time() diff --git a/tornasole/core/tfevent/summary.py b/tornasole/core/tfevent/summary.py new file mode 100644 index 0000000000..0eaa5a0307 --- /dev/null +++ b/tornasole/core/tfevent/summary.py @@ -0,0 +1,148 @@ +import re + +import numpy as np +from .util import make_numpy_array +from .proto.summary_pb2 import Summary, HistogramProto + +_INVALID_TAG_CHARACTERS = re.compile(r'[^-/\w\.]') + + +def _clean_tag(name): + """Cleans a tag. Removes illegal characters for instance. + Adapted from the TensorFlow function `clean_tag()` at + https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/ops/summary_op_util.py + Parameters + ---------- + name : str + The original tag name to be processed. + Returns + ------- + The cleaned tag name. + """ + # In the past, the first argument to summary ops was a tag, which allowed + # arbitrary characters. Now we are changing the first argument to be the node + # name. This has a number of advantages (users of summary ops now can + # take advantage of the tf name scope system) but risks breaking existing + # usage, because a much smaller set of characters are allowed in node names. + # This function replaces all illegal characters with _s, and logs a warning. + # It also strips leading slashes from the name. + if name is not None: + new_name = _INVALID_TAG_CHARACTERS.sub('_', name) + new_name = new_name.lstrip('/') # Remove leading slashes + if new_name != name: + name = new_name + return name + + +def scalar_summary(tag, scalar): + """Outputs a `Summary` protocol buffer containing a single scalar value. + The generated Summary has a Tensor.proto containing the input Tensor. + Adapted from the TensorFlow function `scalar()` at + https://github.com/tensorflow/tensorflow/blob/r1.6/tensorflow/python/summary/summary.py + Parameters + ---------- + tag : str + A name for the generated summary. Will also serve as the series name in TensorBoard. + scalar : int, MXNet `NDArray`, or `numpy.ndarray` + A scalar value or an ndarray of shape (1,). + Returns + ------- + A `Summary` protobuf of the `scalar` value. + Raises + ------ + ValueError: If the scalar has the wrong shape or type. + """ + tag = _clean_tag(tag) + scalar = make_numpy_array(scalar) + assert(scalar.squeeze().ndim == 0), 'scalar should be 0D' + scalar = float(scalar) + return Summary(value=[Summary.Value(tag=tag, simple_value=scalar)]) + + +def _make_histogram(values, bins, max_bins): + """Converts values into a histogram proto using logic from + https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/lib/histogram/histogram.cc""" + if values.size == 0: + raise ValueError('The input has no element.') + values = values.reshape(-1) + counts, limits = np.histogram(values, bins=bins) + num_bins = len(counts) + if max_bins is not None and num_bins > max_bins: + subsampling = num_bins // max_bins + subsampling_remainder = num_bins % subsampling + if subsampling_remainder != 0: + counts = np.pad(counts, pad_width=[[0, subsampling - subsampling_remainder]], + mode="constant", constant_values=0) + counts = counts.reshape(-1, subsampling).sum(axis=-1) + new_limits = np.empty((counts.size + 1,), limits.dtype) + new_limits[:-1] = limits[:-1:subsampling] + new_limits[-1] = limits[-1] + limits = new_limits + + # Find the first and the last bin defining the support of the histogram: + cum_counts = np.cumsum(np.greater(counts, 0, dtype=np.int32)) + start, end = np.searchsorted(cum_counts, [0, cum_counts[-1] - 1], side="right") + start = int(start) + end = int(end) + 1 + del cum_counts + + # TensorBoard only includes the right bin limits. To still have the leftmost limit + # included, we include an empty bin left. + # If start == 0, we need to add an empty one left, otherwise we can just include the bin left to the + # first nonzero-count bin: + counts = counts[start - 1:end] if start > 0 else np.concatenate([[0], counts[:end]]) + limits = limits[start:end + 1] + + if counts.size == 0 or limits.size == 0: + raise ValueError('The histogram is empty, please file a bug report.') + + sum_sq = values.dot(values) + return HistogramProto(min=values.min(), + max=values.max(), + num=len(values), + sum=values.sum(), + sum_squares=sum_sq, + bucket_limit=limits.tolist(), + bucket=counts.tolist()) + + +def _get_default_bins(): + """Ported from the C++ function InitDefaultBucketsInner() in the following file. + https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/lib/histogram/histogram.cc + See the following tutorial for more details on how TensorFlow initialize bin distribution. + https://www.tensorflow.org/programmers_guide/tensorboard_histograms""" + v = 1E-12 + buckets = [] + neg_buckets = [] + while v < 1E20: + buckets.append(v) + neg_buckets.append(-v) + v *= 1.1 + return neg_buckets[::-1] + [0] + buckets + + +def histogram_summary(tag, values, bins, max_bins=None): + """Outputs a `Summary` protocol buffer with a histogram. + Adding a histogram summary makes it possible to visualize the data's distribution in + TensorBoard. See detailed explanation of the TensorBoard histogram dashboard at + https://www.tensorflow.org/get_started/tensorboard_histograms + This op reports an `InvalidArgument` error if any value is not finite. + Adapted from the TensorFlow function `histogram()` at + https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/summary/summary.py + and Pytorch's function + https://github.com/pytorch/pytorch/blob/2655b2710c8f0b3253fe2cfe0d2674f5d3592d22/torch/utils/tensorboard/summary.py#L232 + Parameters + ---------- + tag : str + A name for the summary of the histogram. Will also serve as a series name in + TensorBoard. + values : `numpy.ndarray` + Values for building the histogram. + Returns + ------- + A `Summary` protobuf of the histogram. + """ + tag = _clean_tag(tag) + values = make_numpy_array(values) + hist = _make_histogram(values.astype(float), bins, max_bins) + return Summary(value=[Summary.Value(tag=tag, histo=hist)]) \ No newline at end of file diff --git a/tornasole/core/tfevent/util.py b/tornasole/core/tfevent/util.py index 55b6bb76fb..de658ee82e 100644 --- a/tornasole/core/tfevent/util.py +++ b/tornasole/core/tfevent/util.py @@ -44,3 +44,16 @@ def make_tensor_proto(nparray_data, tag): sb = bytes(s,encoding='utf-8') tensor_proto.string_val.append(sb) return tensor_proto + + +def make_numpy_array(x): + if isinstance(x, np.ndarray): + return x + elif np.isscalar(x): + return np.array([x]) + elif isinstance(x, tuple): + return np.asarray(x, dtype=x.dtype) + else: + raise TypeError('_make_numpy_array only accepts input types of numpy.ndarray, scalar,' + ' while received type {}'.format(str(type(x)))) + diff --git a/tornasole/core/tfrecord/record_reader.py b/tornasole/core/tfrecord/record_reader.py index de099d920c..1c6dc34afa 100644 --- a/tornasole/core/tfrecord/record_reader.py +++ b/tornasole/core/tfrecord/record_reader.py @@ -55,23 +55,26 @@ def has_data(self): #print("HASDATA=", has) return has - def read_record(self, check=True): + def read_record(self, check='minimal'): strlen_bytes = self._reader.read(8) strlen = struct.unpack('Q', strlen_bytes)[0] saved_len_crc = struct.unpack('I', self._reader.read(4))[0] - if check: + + if check in ['minimal', 'full']: computed_len_crc = masked_crc32c(strlen_bytes) assert saved_len_crc == computed_len_crc payload = self._reader.read(strlen) saved_payload_crc = struct.unpack('I', self._reader.read(4))[0] - if check: + if check == 'full': computed_payload_crc = masked_crc32c(payload) assert saved_payload_crc == computed_payload_crc - else: + elif check == 'minimal': computed_payload_crc = masked_crc32c(CHECKSUM_MAGIC_BYTES) assert saved_payload_crc == computed_payload_crc - + else: + # no check + pass return payload def flush(self): diff --git a/tornasole/core/tfrecord/tensor_reader.py b/tornasole/core/tfrecord/tensor_reader.py index eb2499a127..d800c428bf 100644 --- a/tornasole/core/tfrecord/tensor_reader.py +++ b/tornasole/core/tfrecord/tensor_reader.py @@ -1,9 +1,14 @@ import struct + from tornasole.core.tfevent.proto.event_pb2 import Event from tornasole.core.tfevent.event_file_reader import get_tensor_data from tornasole.core.tfrecord.record_reader import masked_crc32c from tornasole.core.tfrecord.record_writer import CHECKSUM_MAGIC_BYTES from tornasole.core.modes import ModeKeys, MODE_PLUGIN_NAME, MODE_STEP_PLUGIN_NAME +from tornasole.core.logger import get_logger + +logger = get_logger() + class TensorReader: def __init__(self, data): @@ -16,45 +21,53 @@ def _read(self, n): self.position += n return data - def read_record(self, check=False): + def read_record(self, check='minimal'): payload = None strlen_bytes = self._read(8) # will give you payload for the record, which is essentially the event. strlen = struct.unpack('Q', strlen_bytes)[0] saved_len_crc = struct.unpack('I', self._read(4))[0] - if check: + + if check in ['minimal', 'full']: computed_len_crc = masked_crc32c(strlen_bytes) assert saved_len_crc == computed_len_crc + payload = self._read(strlen) saved_payload_crc = struct.unpack('I', self._read(4))[0] - if check: + if check == 'full': computed_payload_crc = masked_crc32c(payload) assert saved_payload_crc == computed_payload_crc - else: + elif check == 'minimal': computed_payload_crc = masked_crc32c(CHECKSUM_MAGIC_BYTES) assert saved_payload_crc == computed_payload_crc + else: + # no check + pass return payload - def read_tensors(self, check=False): + def _get_mode_modestep(self, step, plugin_data): + mode_step = step + mode = ModeKeys.GLOBAL + for metadata in plugin_data: + if metadata.plugin_name == MODE_STEP_PLUGIN_NAME: + mode_step = int(metadata.content) + if metadata.plugin_name == MODE_PLUGIN_NAME: + mode = ModeKeys(int(metadata.content)) + return mode, mode_step + + def read_tensors(self, check='minimal'): for (step,summ) in self.read_summaries(check=check): for v in summ.value: - assert v.WhichOneof('value') == 'tensor' + val = v.WhichOneof('value') + assert val == 'tensor' tensor_name = v.tag # We have found the right tensor at the right step tensor_data = get_tensor_data(v.tensor) - - # default values - # todo: validate the logic extensively - mode_step = step - mode = ModeKeys.GLOBAL - for metadata in v.metadata.plugin_data: - if metadata.plugin_name == MODE_STEP_PLUGIN_NAME: - mode_step = int(metadata.content) - if metadata.plugin_name == MODE_PLUGIN_NAME: - mode = ModeKeys(int(metadata.content)) + mode, mode_step = self._get_mode_modestep( + step, v.metadata.plugin_data) yield (tensor_name, step, tensor_data, mode, mode_step) - def read_summaries(self, check=False): + def read_summaries(self, check='minimal'): for ev in self.read_events(check=check): #assert ev.HasField('step') if not ev.HasField('summary'): @@ -62,7 +75,7 @@ def read_summaries(self, check=False): assert ev.HasField('summary') yield (ev.step, ev.summary) - def read_events(self, check=False): + def read_events(self, check='minimal'): while self.has_data(): rec = self.read_record(check=check) event = Event() diff --git a/tornasole/core/utils.py b/tornasole/core/utils.py index d8565e204a..41561a3736 100644 --- a/tornasole/core/utils.py +++ b/tornasole/core/utils.py @@ -145,10 +145,16 @@ def step_in_range(range_steps, step): def get_relative_event_file_path(path): p = Path(path) path_parts = p.parts - assert path_parts[-3] == "events" + assert path_parts[-3] in ["events", "tensorboard"], print(path) return os.path.join(*path_parts[-3:]) +def size_and_shape(t): + if type(t) == bytes or type(t) == str: + return (len(t), [len(t)]) + return (t.nbytes, t.shape) + + def parse_worker_name_from_file(filename): # worker_2 = /tmp/ts-logs/index/000000001/000000001230_worker_2.json worker_name_regex = re.compile('.+\/\d+_(.+)\.(json|csv|tfevents)$') diff --git a/tornasole/core/writer.py b/tornasole/core/writer.py index 24f1cbf543..c0d6ae28b9 100644 --- a/tornasole/core/writer.py +++ b/tornasole/core/writer.py @@ -16,18 +16,27 @@ # under the License. """APIs for logging data in the event file.""" -from .locations import EventFileLocation, IndexFileLocationUtils +from tornasole.core.tfevent.util import make_tensor_proto from tornasole.core.tfevent.event_file_writer import EventFileWriter +from tornasole.core.tfevent.summary import make_numpy_array, histogram_summary, \ + _get_default_bins, scalar_summary from tornasole.core.tfevent.index_file_writer import IndexWriter +from tornasole.core.tfevent.proto.event_pb2 import Event, TaggedRunMetadata +from tornasole.core.tfevent.proto.summary_pb2 import Summary, SummaryMetadata +from tornasole.core.modes import MODE_STEP_PLUGIN_NAME, MODE_PLUGIN_NAME +from .logger import get_logger +from .locations import TensorFileLocation, IndexFileLocationUtils, TensorboardFileLocation +from .modes import ModeKeys import socket -from .modes import ModeKeys + +logger = get_logger() class FileWriter: def __init__(self, trial_dir, step=0, worker=None, - wtype='tensor', + wtype='events', mode=ModeKeys.GLOBAL, max_queue=10, flush_secs=120, verbose=False, write_checksum=False): """Creates a `FileWriter` and an file. @@ -56,13 +65,19 @@ def __init__(self, trial_dir, step=0, worker=None, if worker is None: self.worker = socket.gethostname() - index_file_path = IndexFileLocationUtils.get_index_key_for_step( - self.trial_dir, self.step, self.worker) - self.index_writer = IndexWriter(index_file_path) - - if wtype == 'tensor': - el = EventFileLocation(step_num=self.step, worker_name=self.worker) - event_file_path = el.get_location(trial_dir=self.trial_dir) + self.mode = mode + if wtype == 'events': + el = TensorFileLocation(step_num=self.step, worker_name=self.worker) + event_file_path = el.get_file_location(trial_dir=self.trial_dir) + index_file_path = IndexFileLocationUtils.get_index_key_for_step( + self.trial_dir, self.step, self.worker) + self.index_writer = IndexWriter(index_file_path) + elif wtype == 'tensorboard': + el = TensorboardFileLocation(step_num=self.step, + worker_name=self.worker, + mode=self.mode) + event_file_path = el.get_file_location(trial_dir=self.trial_dir) + self.index_writer = None else: assert False, 'Writer type not supported: {}'.format(wtype) @@ -71,6 +86,7 @@ def __init__(self, trial_dir, step=0, worker=None, max_queue=max_queue, flush_secs=flush_secs, verbose=verbose, write_checksum=write_checksum ) + self._default_bins = _get_default_bins() def __enter__(self): """Make usable with "with" statement.""" @@ -80,21 +96,85 @@ def __exit__(self, unused_type, unused_value, unused_traceback): """Make usable with "with" statement.""" self.close() + @staticmethod + def _get_metadata(mode, mode_step): + sm2 = SummaryMetadata.PluginData(plugin_name=MODE_STEP_PLUGIN_NAME, + content=str(mode_step)) + sm3 = SummaryMetadata.PluginData(plugin_name=MODE_PLUGIN_NAME, + content=str(mode.value)) + plugin_data = [ + sm2, + sm3] + smd = SummaryMetadata(plugin_data=plugin_data) + return smd + def write_tensor(self, tdata, tname, write_index=True, mode=ModeKeys.GLOBAL, mode_step=None): mode, mode_step = self._check_mode_step(mode, mode_step, self.step) - self._writer.write_tensor(tdata, tname, write_index, - global_step=self.step, - mode=mode, mode_step=mode_step) - - def write_summary(self, summ, tname, global_step, write_index=True, - mode=ModeKeys.GLOBAL, mode_step=None): - mode, mode_step = self._check_mode_step(mode, mode_step, global_step) + smd = self._get_metadata(mode, mode_step) + value = make_numpy_array(tdata) + tag = tname + tensor_proto = make_tensor_proto(nparray_data=value, tag=tag) + s = Summary(value=[Summary.Value(tag=tag, metadata=smd, + tensor=tensor_proto)]) if write_index: self._writer.write_summary_with_index( - summ, global_step, tname, mode, mode_step) + s, self.step, tname, mode, mode_step) else: - self._writer.write_summary(summ, global_step) + self._writer.write_summary(s, self.step) + + def write_graph(self, graph): + self._writer.write_graph(graph) + + def write_pytorch_graph(self, graph_profile): + # https://github.com/pytorch/pytorch/blob/c749be9e9f8dd3db8b3582e93f917bd47e8e9e20/torch/utils/tensorboard/writer.py # L99 + # graph_profile = pytorch_graph.graph(self.model) + graph = graph_profile[0] + stepstats = graph_profile[1] + event = Event(graph_def=graph.SerializeToString()) + self._writer.write_event(event) + trm = TaggedRunMetadata( + tag='step1', run_metadata=stepstats.SerializeToString()) + event = Event(tagged_run_metadata=trm) + self._writer.write_event(event) + + def write_summary(self, summ, global_step): + self._writer.write_summary(summ, global_step) + + def write_histogram_summary(self, tdata, tname, global_step, bins='default'): + """Add histogram data to the event file. + Parameters + ---------- + tname : str + Name for the `values`. + tdata: `numpy.ndarray` + Values for building histogram. + global_step : int + Global step value to record. + bins : int or sequence of scalars or str + If `bins` is an int, it defines the number equal-width bins in the range + `(values.min(), values.max())`. + If `bins` is a sequence, it defines the bin edges, including the rightmost edge, + allowing for non-uniform bin width. + If `bins` is a str equal to 'default', it will use the bin distribution + defined in TensorFlow for building histogram. + Ref: https://www.tensorflow.org/programmers_guide/tensorboard_histograms + The rest of supported strings for `bins` are 'auto', 'fd', 'doane', 'scott', + 'rice', 'sturges', and 'sqrt'. etc. See the documentation of `numpy.histogram` + for detailed definitions of those strings. + https://docs.scipy.org/doc/numpy/reference/generated/numpy.histogram.html + """ + if bins == 'default': + bins = self._default_bins + try: + s = histogram_summary(tname, tdata, bins) + self._writer.write_summary(s, global_step) + except ValueError as e: + logger.error(f'Unable to write histogram {tname} at {global_step}') + + def write_scalar_summary(self, name, value, global_step): + s = scalar_summary(name, value) + self._writer.write_summary(s, global_step) def flush(self): """Flushes the event file to disk. @@ -108,7 +188,8 @@ def close(self): Call this method when you do not need the summary writer anymore. """ self._writer.close() - self.index_writer.close() + if self.index_writer is not None: + self.index_writer.close() def name(self): return self._writer.name() diff --git a/tornasole/mxnet/hook.py b/tornasole/mxnet/hook.py index 0da6ddc9e6..1e5e7e6535 100644 --- a/tornasole/mxnet/hook.py +++ b/tornasole/mxnet/hook.py @@ -1,15 +1,11 @@ -import logging import mxnet as mx -from tornasole.core.hook import CallbackHook -from tornasole.core.logger import get_logger -from tornasole.core.json_config import TORNASOLE_CONFIG_DEFAULT_WORKER_NAME, create_hook_from_json_config - -import logging from tornasole.core.collection import CollectionKeys +from tornasole.core.hook import CallbackHook +from tornasole.core.json_config import TORNASOLE_CONFIG_DEFAULT_WORKER_NAME, \ + create_hook_from_json_config from tornasole.mxnet.mxnet_collection import get_collection_manager from tornasole.mxnet.utils import get_reduction_of_data, make_numpy_array - -logger = get_logger() +# from tornasole.mxnet.graph import _net2pb DEFAULT_INCLUDE_COLLECTIONS = [CollectionKeys.LOSSES] @@ -46,6 +42,8 @@ def __init__(self, if CollectionKeys.LOSSES not in self.include_collections: self.include_collections.append(CollectionKeys.LOSSES) self.last_block = None + self.model = None + self.exported_model = False def get_worker_name(self): return TORNASOLE_CONFIG_DEFAULT_WORKER_NAME @@ -61,6 +59,8 @@ def _cleanup(self): # Write the gradients of the past step if the writer is still available. if self.writer is not None and self.last_block is not None: self.log_params(self.last_block) + if self.exported_model is False: + self._export_model() super()._cleanup() def log_params(self, block): @@ -69,11 +69,23 @@ def log_params(self, block): self.log_param(param) def log_param(self, param): - self._write_tensor(tensor_name=param.name, tensor_value=param.data(param.list_ctx()[0])) + self._save_for_tensor(tensor_name=param.name, tensor_value=param.data(param.list_ctx()[0])) # If Gradient for this param is available if param.grad_req != 'null': - self._write_tensor(tensor_name=self.GRADIENT_PREFIX + param.name, - tensor_value=param.grad(param.list_ctx()[0])) + self._save_for_tensor(tensor_name=self.GRADIENT_PREFIX + param.name, + tensor_value=param.grad(param.list_ctx()[0])) + + def _export_model(self): + pass + # if self.model is not None: + # try: + # self._get_tb_writer().write_graph(_net2pb(self.model)) + # except (RuntimeError, TypeError) as e: + # self.logger.warning( + # f'Could not export model graph for tensorboard ' + # f'due to the mxnet exception: {e}') + # else: + # self.logger.warning('Tornasole does not know the model') # This hook is invoked by trainer prior to running the forward pass. def forward_pre_hook(self, block, inputs): @@ -81,7 +93,8 @@ def forward_pre_hook(self, block, inputs): # Write the params and gradients of the # past step if the writer is still available. self.log_params(block) - self._flush_and_close_writer() + self._close_writer() + self._close_tb_writer() if not self.prepared_collections: # at this point we need all collections to be ready @@ -92,18 +105,23 @@ def forward_pre_hook(self, block, inputs): self._increment_step() - if self._process_step(): + if self._get_collections_to_save_for_step(): self._initialize_writer() + if self.exported_model is False: + self._export_model() + self.exported_model = True + if self.last_saved_step is not None and not self.exported_collections: self.export_collections() + self._export_model() self.exported_collections = True + self.last_block = block # This hook is invoked by trainer after running the forward pass. def forward_hook(self, block, inputs, outputs): - if self.collections_in_this_step is None: - logging.debug("Skipping the global step {0}".format(self.step)) + if not self._get_collections_to_save_for_step(): return block_name = block.name @@ -150,16 +168,18 @@ def register_hook(self, block): the default collectors viz. gradients, weight and bias """ if not isinstance(block, mx.gluon.Block): - logger.error("The given block type {0} is not " - "currently supported by Tornasole Hook" - .format(block.__class__.__name__)) + self.logger.error( + f"The given block type {block.__class__.__name__} is not " + f"currently supported by Tornasole Hook") return # Skip the forward pre hook for the Loss blocks. if isinstance(block, mx.gluon.loss.Loss): - logger.info("Registering hook for block {0}".format(block.name)) + self.logger.info(f"Registering hook for block {block.name}") block.register_forward_hook(self.forward_hook) return + else: + self.model = block is_recursive = self._is_recursive_needed() block.register_forward_pre_hook(self.forward_pre_hook) diff --git a/tornasole/pytorch/hook.py b/tornasole/pytorch/hook.py index 93bf708ae7..0118c7adeb 100644 --- a/tornasole/pytorch/hook.py +++ b/tornasole/pytorch/hook.py @@ -1,16 +1,14 @@ -import importlib +from copy import deepcopy import torch import torch.distributed as dist -import logging +from tornasole.core.json_config import create_hook_from_json_config, \ + TORNASOLE_CONFIG_DEFAULT_WORKER_NAME +from tornasole.core.logger import get_logger from tornasole.core.hook import CallbackHook from tornasole.core.collection import CollectionKeys -from tornasole.core.logger import get_logger -from tornasole.core.json_config import create_hook_from_json_config from tornasole.pytorch.collection import get_collection_manager from tornasole.pytorch.utils import get_reduction_of_data, make_numpy_array -from tornasole.core.json_config import TORNASOLE_CONFIG_DEFAULT_WORKER_NAME - -logger = get_logger() +# from tornasole.pytorch._pytorch_graph import graph as create_graph DEFAULT_INCLUDE_COLLECTIONS = [ CollectionKeys.WEIGHTS, @@ -45,6 +43,8 @@ def __init__(self, # mapping of module objects to their names, # useful in forward hook for logging input/output of modules self.module_maps = dict() + self.model = None + self.exported_model = False def get_num_workers(self): """Check horovod and torch.distributed.""" @@ -91,12 +91,27 @@ def log_params(self, module): pname = module_name + '_' + name # This overwhelms the logs; turn back on if you really need it # self.logger.debug( - # "Processing the global step {0} for parameter {1}".format(self.step, pname)) - self._write_tensor(tensor_name=pname, tensor_value=param.data) + # "Processing the global step {0} for parameter {1}".format(self.step, pname)) + self._save_for_tensor(tensor_name=pname, tensor_value=param.data) + + def _export_model(self, inputs): + pass + # todo: export model when only run for 1 step in cleanup + # coming in separate PR + # if self.model is not None: + # try: + # self._get_tb_writer().write_pytorch_graph( + # create_graph(self.model, inputs)) + # except ValueError as e: + # self.logger.warning( + # f'Could not export model graph for tensorboard ' + # f'due to the pytorch exception: {e}') # This hook is invoked by trainer prior to running the forward pass. def forward_pre_hook(self, module, inputs): - self._flush_and_close_writer() + # Write the gradients of the past step if the writer is still available. + self._close_writer() + self._close_tb_writer() if not self.prepared_collections: # at this point we need all collections to be ready @@ -107,18 +122,21 @@ def forward_pre_hook(self, module, inputs): self._increment_step() - if self._process_step(): + if self._get_collections_to_save_for_step(): self._initialize_writer() self.log_params(module) + if self.exported_model is False: + self._export_model(inputs) + self.exported_model = True + if self.last_saved_step is not None and not self.exported_collections: self.export_collections() self.exported_collections = True # This hook is invoked by trainer after running the forward pass. def forward_hook(self, module, inputs, outputs): - if self.collections_in_this_step is None: - logging.debug("Skipping the global step {0}".format(self.step)) + if not self._get_collections_to_save_for_step(): return module_name = self.module_maps[module] @@ -133,12 +151,14 @@ def forward_hook(self, module, inputs, outputs): self.last_saved_step = self.step def backward_hook(self, tname): - # Helper function that has access to the parameter name via the scope in which it's defined. + # Helper function that has access to the parameter name via + # the scope in which it's defined. def back(grad): - if self._process_step(): + if self._get_collections_to_save_for_step(): if grad is not None: - logger.debug("Processing the backward step {0} for {1}".format(self.step, tname)) - self._write_tensor(tensor_name=self.GRADIENT_PREFIX + tname, tensor_value=grad) + self.logger.debug(f"Processing the backward step " + f"{self.step} for {tname}") + self._save_for_tensor(self.GRADIENT_PREFIX + tname, grad) return back def _backward_apply(self, module): @@ -166,6 +186,9 @@ def register_hook(self, module): if not isinstance(module, torch.nn.Module): raise ValueError(f"Module type {module.__class__.__name__} must be type torch.nn.Module") + # deepcopy the model because models with hooks can't be exported + self.model = deepcopy(module) + # Create a mapping from modules to their names for name, submodule in module.named_modules(): assert submodule not in self.module_maps, f"Don't register module={module} twice" diff --git a/tornasole/tensorflow/collection.py b/tornasole/tensorflow/collection.py index d5a3bc9637..c9a461dfbb 100644 --- a/tornasole/tensorflow/collection.py +++ b/tornasole/tensorflow/collection.py @@ -8,9 +8,9 @@ class Collection(BaseCollection): def __init__(self, name, include_regex=None, tensor_names=None, - reduction_config=None, save_config=None): + reduction_config=None, save_config=None, save_histogram=True): super().__init__(name, include_regex, tensor_names, - reduction_config, save_config) + reduction_config, save_config, save_histogram) self.tensors = [] # has the new tensors added to graph # reduction_tensor_names has the names of original tensors @@ -75,9 +75,11 @@ class CollectionManager(BaseCollectionManager): def __init__(self, collections=None, create_default=True): super().__init__(collections=collections) if create_default: - for n in [CollectionKeys.DEFAULT, CollectionKeys.WEIGHTS, + for n in [CollectionKeys.DEFAULT, + CollectionKeys.WEIGHTS, CollectionKeys.GRADIENTS, - CollectionKeys.LOSSES]: + CollectionKeys.LOSSES, + CollectionKeys.SCALARS]: self.create_collection(n) def create_collection(self, name): diff --git a/tornasole/tensorflow/hook.py b/tornasole/tensorflow/hook.py index 14bc73fdb2..004e7bfde9 100644 --- a/tornasole/tensorflow/hook.py +++ b/tornasole/tensorflow/hook.py @@ -1,20 +1,19 @@ -import numpy as np - +import tensorflow.compat.v1 as tf from .utils import * from .reductions import get_tensorflow_reduction -from .collection import * - -from tornasole.core.hook import BaseHook +from .collection import get_collection_manager +from tornasole.core.tfevent.proto.summary_pb2 import Summary from tornasole.core.utils import match_inc +from tornasole.core.collection import CollectionKeys, SUMMARIES_COLLECTIONS +from tornasole.core.hook import BaseHook from tornasole.core.reductions import get_reduction_tensor_name from tornasole.core.json_config import TORNASOLE_CONFIG_DEFAULT_WORKER_NAME, create_hook_from_json_config -DEFAULT_INCLUDE_COLLECTIONS = [ - CollectionKeys.WEIGHTS, - CollectionKeys.GRADIENTS, - CollectionKeys.DEFAULT, - CollectionKeys.LOSSES -] +DEFAULT_INCLUDE_COLLECTIONS = [CollectionKeys.WEIGHTS, + CollectionKeys.GRADIENTS, + CollectionKeys.DEFAULT, + CollectionKeys.LOSSES, + CollectionKeys.SCALARS] class TornasoleHook(tf.train.SessionRunHook, BaseHook): @@ -79,6 +78,9 @@ def __init__(self, out_dir=None, save_all=save_all) self.reduction_original_tensors = {} self.subgraph_nodes_cache = {} + self.summaries_original_tensors = {} + self.graph = None + self.tensors_to_save_this_step = None def get_worker_name(self): try: @@ -117,23 +119,22 @@ def _prepare_tensors(self): def _process_matched_tensor(self, tensor, collection): reduction_config = collection.get_reduction_config() if reduction_config: - for reduction in reduction_config.reductions + reduction_config.norms: - self._add_reduction(tensor, reduction, collection, False) - for reduction in reduction_config.abs_reductions + reduction_config.abs_norms: - self._add_reduction(tensor, reduction, collection, True) - - # here if reduction config was set, - # but tensors were added to collection, - # they will be removed and added to reduction_tensors - try: - collection.remove_tensor(tensor) - except IndexError: - # was not in the list - pass - # so this is available in this collection for reader - # hook will read from tensors and reduction_tensors_added lists + for reduction_list in (reduction_config.reductions, reduction_config.norms): + for reduction in reduction_list: + self._add_reduction(tensor, reduction, collection, False) + for reduction_list in (reduction_config.abs_reductions, reduction_config.abs_norms): + for reduction in reduction_list: + self._add_reduction(tensor, reduction, collection, True) + + if reduction_config.save_raw_tensor is False: + # this is for export so that trial knows that this tensor + # belongs to this collection collection.add_tensor_name(tensor.name) - else: + + case1 = collection.name in [CollectionKeys.SCALARS, CollectionKeys.LOSSES] + case2 = collection.name in self.include_collections and collection.save_histogram is True + case3 = reduction_config.save_raw_tensor is True + if case1 or case2 or case3: collection.add(tensor) def _check_and_add_tensor(self, t): @@ -145,10 +146,19 @@ def _check_and_add_tensor(self, t): added = False for coll in self._get_all_collections_to_save(): + if coll.name in SUMMARIES_COLLECTIONS: + # these summaries directly does not take any regex patterns + # or tensor names. it just takes them from tf.summaries + # or added when other collections have write_histograms True + # this also ensures that we don't look at reductions for + # these collections + # note that for scalars collection we do + # look at regex patterns + continue + if match_inc(t.name, coll.get_include_regex()) \ or t.name in coll.tensor_names: self._process_matched_tensor(t, coll) - # only matches with one collection added = True return added @@ -161,32 +171,37 @@ def _add_reduction(self, tensor, reduction_name, collection, abs=False): collection.add_reduction_tensor(red_tensor, original_tensor=tensor) def _add_tensors(self): - # gradients and optimizer_variables added in user code or TornasoleOptimizer - + # gradients and optimizer_variables added + # in user code or TornasoleOptimizer total_tensor_count = 0 # todo: do we ever need inputs of the op for op in self.graph.get_operations(): for tensor in op.outputs: self._check_and_add_tensor(tensor) total_tensor_count += 1 - # all variables we are interested in are part of the graph tensors - # for variable in tf.global_variables(): - # self._check_and_add_tensor(variable) - # total_tensor_count += 1 return total_tensor_count - def begin(self): - # todo: handle multiple graphs in the model - self.graph = tf.get_default_graph() + def _add_summaries_tensors(self): + if CollectionKeys.TENSORFLOW_SUMMARIES in self.include_collections: + c = self.collection_manager.get(CollectionKeys.TENSORFLOW_SUMMARIES) + for t in tf.get_collection(tf.GraphKeys.SUMMARIES): + self.summaries_original_tensors[t.name] = t.op.inputs[1] + c.add(t) + def _clear_tensor_objects_from_collections(self): for coll_name, coll in self.collection_manager.get_collections().items(): - # hack to make multiple graphs work with the same tensor names + # to make multiple graphs work with the same tensor names # this can happen when we use same hook for training and evaluation # what is going on here is that we clear the tensors and reduction tensors # but we use the tensor names field in collection to readd tensors # from the new graph to the collection so we can them right coll.tensors = [] - coll.reduction_tensors = [] + coll.reduction_tensors_added = [] + + def begin(self): + # todo: should this be called first time a mode changes + # todo: handle multiple graphs in the model + self.graph = tf.get_default_graph() wts = tf.trainable_variables() self.collection_manager.get(CollectionKeys.WEIGHTS).add(wts) @@ -194,33 +209,36 @@ def begin(self): losses = tf.losses.get_losses() self.collection_manager.get(CollectionKeys.LOSSES).add(losses) - # at this point we need all collections to be ready - # this may not be the case at creation of hook - # as user's code after hook might add collections + # so that new graph does not cause conflicts for the graph + # property of tensors added + self._clear_tensor_objects_from_collections() + + # needs this so save_config for collection can be used in add_tensors self._prepare_collections() - # adds all tensors in graph based on regexes in collections default and other custom ones + self._add_summaries_tensors() self._add_tensors() + + # after all tensors are added self._prepare_tensors() for coll in self._get_all_collections_to_save(): - self.logger.info(f'Saving the collection {coll.name} with {len(coll.tensor_names)} tensors ' \ - f'and {len(coll.reduction_tensors_added)} reductions') + self.logger.info(f'Saving the collection {coll.name} with {len(coll.tensors)} tensors ' \ + f'and {len(coll.reduction_tensors_added)} reductions') self.logger.debug(f' Collection {coll.name} has tensors: {coll.tensors}') self.logger.debug(f' Collection {coll.name} has reductions: {coll.reduction_tensors_added}') - self.export_collections() self._export_model() + self.export_collections() def _export_model(self): - # todo save model - pass + self.logger.info('Writing graph') + self._get_tb_writer().write_graph(self.graph.as_graph_def(add_shapes=True)) + # self._close_tb_writer() def _get_tensors_to_save_this_step(self): - colls_to_save = self._get_collections_to_save_for_step( - self.mode, self.mode_steps[self.mode]) tensors_to_save = {'watched': [], 'added': []} - for coll in colls_to_save: + for coll in self._get_collections_to_save_for_step(): tensors_to_save['watched'].extend(coll.tensors) tensors_to_save['added'].extend(coll.reduction_tensors_added) # dedup watched and added @@ -229,6 +247,7 @@ def _get_tensors_to_save_this_step(self): return tensors_to_save def _filter_to_be_saved(self, dict_to_save, fetches): + # todo: handle all types of complex fetches if not isinstance(fetches, list) and not isinstance(fetches, tuple) \ and not isinstance(fetches, dict): fetches = [fetches] @@ -251,6 +270,14 @@ def _filter_to_be_saved(self, dict_to_save, fetches): for tensor in dict_to_save['watched']: if node_name(tensor.name) in subgraph_nodes: filtered.append(tensor) + elif tensor.name in self.summaries_original_tensors.keys(): + ot = self.summaries_original_tensors[tensor.name] + if node_name(ot.name) in subgraph_nodes: + # summary should only be included + # if the original tensor is included for safety + filtered.append(tensor) + else: + skipped.append(tensor) else: skipped.append(tensor) for tensor in dict_to_save['added']: @@ -260,9 +287,11 @@ def _filter_to_be_saved(self, dict_to_save, fetches): filtered.append(tensor) else: skipped.append(tensor) - self.logger.debug(f'Skipped {len(skipped)} unreachable tensors: {skipped}') - # todo(huilgolr) can we filter tensors with (0)shape here + if len(skipped) > 0: + self.logger.debug(f'Skipped {len(skipped)} unreachable tensors: {skipped}') + + # todo(huilgolr) can we filter tensors with (0)shape here. do we want to? return filtered def before_run(self, run_context): @@ -276,30 +305,35 @@ def before_run(self, run_context): tensors_to_save['added'] else: list_to_save = [] - self.prev_to_be_saved = list_to_save + self.tensors_to_save_this_step = list_to_save return tf.train.SessionRunArgs(list_to_save) if list_to_save else None - def _save_tensor(self, tensor, value): - if tensor.dtype == np.float16: - # todo: save as fp16 itself. - # measure perf as proto doesn't handle that well - value = np.float32(value) - size_saved = value.nbytes - this_size, this_shape = size_and_shape(value) - if this_size > 0: - self.logger.debug(f' Saving {tensor.name}, type={tensor.dtype}, shape={this_shape},' + - f'size={this_size}') - if not self.dry_run: - self.writer.write_tensor(tdata=value, tname=tensor.name, - mode=self.mode, - mode_step=self.mode_steps[self.mode]) + def _write_tf_summary(self, tensor, value): + try: + # likely a summary + self.logger.debug( + f'Saving summary {tensor.name} with length {len(value)}') + s = Summary.FromString(value) + self._get_tb_writer().write_summary(s, self.step) + except Exception as e: + # can it not be a summary? + self.logger.error( + f'Ran into the exception when saving {tensor}: {e}') + + def _write_for_tensor(self, tensor_name, tensor_value, save_collections): + # if reduction tensor + if tensor_name in self.reduction_original_tensors: + self._write_raw_tensor_simple(tensor_name, tensor_value) else: - self.logger.debug(f' Not saving {tensor.name}, type={tensor.dtype}, shape={this_shape},' + - f'size={this_size}') - return size_saved + self._write_histogram_summary(tensor_name, tensor_value, save_collections) + # skip writing reductions as TF handles them in the graph itself + + # save raw tensor if reduction config calls for that + self._write_raw_tensor(tensor_name, tensor_value, save_collections) + self._write_scalar_summary(tensor_name, tensor_value, save_collections) def _get_all_tensors_values(self, results): - for (item, value) in zip(self.prev_to_be_saved, results): + for (item, value) in zip(self.tensors_to_save_this_step, results): if not isinstance(value, list) or isinstance(value, tuple): assert not (isinstance(item, list) or isinstance(item, tuple)) yield item, value @@ -309,16 +343,25 @@ def _get_all_tensors_values(self, results): yield item[i], value[i] def after_run(self, run_context, run_values): - if self.prev_to_be_saved: + if self.tensors_to_save_this_step: self._initialize_writer() - running_size = 0 - for (item, value) in self._get_all_tensors_values(run_values.results): - running_size += self._save_tensor(item, value) - self.logger.info(f'Saved {running_size} bytes for ' - f'{len(self.prev_to_be_saved)} objects ' - f'at step {self.step}') - self._flush_and_close_writer() + for (tensor, value) in self._get_all_tensors_values(run_values.results): + if tensor.dtype == tf.string: + self._write_tf_summary(tensor, value) + else: + self._save_for_tensor(tensor.name, value, + check_before_write=False) + self._close_writer() + self._close_tb_writer() self._increment_step() + @staticmethod + def _make_numpy_array(tensor_value): + """ + Convert the tensor value into a numpy array. + Here it's already numpy array + """ + return tensor_value + def end(self, sess): pass diff --git a/tornasole/tensorflow/keras.py b/tornasole/tensorflow/keras.py index 7593ef1049..6297968923 100644 --- a/tornasole/tensorflow/keras.py +++ b/tornasole/tensorflow/keras.py @@ -1,10 +1,14 @@ import keras -from .collection import * +from .collection import get_collection_manager, CollectionKeys from tornasole.core.hook import BaseHook from tornasole.core.save_config import SaveConfig -DEFAULT_INCLUDE_COLLECTIONS=['weights', 'gradients', 'metrics'] +DEFAULT_INCLUDE_COLLECTIONS=[ + CollectionKeys.WEIGHTS, + CollectionKeys.GRADIENTS, + 'metrics' +] class TornasoleHook(keras.callbacks.Callback, BaseHook): @@ -21,7 +25,6 @@ def __init__(self, out_dir, if save_all is not None: msg = "'include_regex' is not yet supported and will be ignored." self.logger.warning(msg) - super().__init__(collection_manager=get_collection_manager(), default_include_collections=DEFAULT_INCLUDE_COLLECTIONS, out_dir=out_dir, @@ -50,9 +53,9 @@ def _export_collections( self, logs): tensor_name = cfg['name'] if multi: tensor_name += "_" + str(i) - self.collection_manager.get("weights").add_tensor_name(tensor_name) + self.collection_manager.get(CollectionKeys.WEIGHTS).add_tensor_name(tensor_name) - self.collection_manager.get('gradients').add([]) + self.collection_manager.get(CollectionKeys.GRADIENTS).add([]) # at this point we need all collections to be ready # this may not be the case at creation of hook @@ -66,7 +69,7 @@ def on_epoch_end(self, epoch, logs=None): if logs is None: logs = {} self.save_metrics(logs=logs, force=True) - self._flush_and_close_writer() + self._close_writer() def on_batch_end(self, batch, logs=None): if logs is None: @@ -74,13 +77,12 @@ def on_batch_end(self, batch, logs=None): self._export_collections(logs) self.save_metrics(logs=logs, force=False) self.save_layer_data() - self._flush_and_close_writer() + self._close_writer() self._increment_step() def save_metrics(self, logs, force): for k in logs: - if self._should_save_tensor_for_step( - k, self.mode, self.mode_steps[self.mode]) or force: + if self._should_save_tensor_for_step(k) or force: val = logs[k] self._initialize_writer() self.writer.write_tensor(tname=k, tdata=val, @@ -100,8 +102,7 @@ def save_layer_data(self): tensor_name = cfg['name'] if multi: tensor_name += "_" + str(i) - if self._should_save_tensor_for_step( - tensor_name, self.mode, self.mode_steps[self.mode]): + if self._should_save_tensor_for_step(tensor_name): self._initialize_writer() self.writer.write_tensor( tdata=tensor_value, diff --git a/tornasole/tensorflow/utils.py b/tornasole/tensorflow/utils.py index cfa8e14cea..bdba16ea74 100644 --- a/tornasole/tensorflow/utils.py +++ b/tornasole/tensorflow/utils.py @@ -45,7 +45,3 @@ def get_original_fetch_ops(fetches): raise RuntimeError('Invalid fetches') -def size_and_shape(t): - if type(t) == bytes or type(t) == str: - return (len(t), [len(t)]) - return (t.nbytes, t.shape) diff --git a/tornasole/trials/local_trial.py b/tornasole/trials/local_trial.py index 7bba820fe8..cdbb21539d 100644 --- a/tornasole/trials/local_trial.py +++ b/tornasole/trials/local_trial.py @@ -1,12 +1,10 @@ from .trial import EventFileTensor, Trial -from tornasole.core.locations import EventFileLocation +from tornasole.core.locations import TensorFileLocation from tornasole.core.collection_manager import CollectionManager from tornasole.core.reader import FileReader -from tornasole.core.utils import index, step_in_range, list_collection_files_in_directory, \ - get_worker_name_from_collection_file, parse_worker_name_from_file +from tornasole.core.utils import index, step_in_range, parse_worker_name_from_file -import time import os import multiprocessing import struct @@ -35,7 +33,7 @@ def _load_tensors_from_index_tensors(self, index_tensors_dict): def _load_tensors_from_event_files(self): try: - step_dirs = EventFileLocation.get_step_dirs(self.trial_dir) + step_dirs = TensorFileLocation.get_step_dirs(self.trial_dir) except FileNotFoundError: self.logger.debug('Waiting to see data for steps') return @@ -80,7 +78,7 @@ def _read_step_dirs(self, step_dirs): try: dirnames_efts = Parallel(n_jobs=multiprocessing.cpu_count(), verbose=0) \ (delayed(self._read_folder) \ - (EventFileLocation.get_step_dir_path(self.trial_dir, step_dir), + (TensorFileLocation.get_step_dir_path(self.trial_dir, step_dir), check=self.check) \ for step_dir in step_dirs) # sort them as parallel returns in random order @@ -92,7 +90,7 @@ def _read_step_dirs(self, step_dirs): self._read_step_dirs(step_dirs) else: for step_dir in step_dirs: - step_dir_path = EventFileLocation.get_step_dir_path(self.trial_dir, step_dir) + step_dir_path = TensorFileLocation.get_step_dir_path(self.trial_dir, step_dir) dirnames_efts.extend(self._read_folder(step_dir_path, check=self.check)) diff --git a/tornasole/trials/s3_trial.py b/tornasole/trials/s3_trial.py index e805436657..f83ef54280 100644 --- a/tornasole/trials/s3_trial.py +++ b/tornasole/trials/s3_trial.py @@ -2,7 +2,7 @@ from tornasole.core.access_layer.s3handler import ReadObjectRequest, S3Handler from tornasole.core.s3_utils import list_s3_objects -from tornasole.core.locations import EventFileLocation +from tornasole.core.locations import TensorFileLocation from tornasole.core.collection_manager import CollectionManager from tornasole.core.tfrecord.tensor_reader import TensorReader from tornasole.core.utils import step_in_range @@ -65,7 +65,7 @@ def _load_tensors_from_event_files(self, start_after_key=None): start_after_key) self.logger.debug("Got objects:{}".format(objects)) for objname in objects: - efl = EventFileLocation.match_regex(objname) + efl = TensorFileLocation.match_regex(objname) if efl: if (self.range_steps is not None and step_in_range(self.range_steps, efl.step_num)) or \ self.range_steps is None: diff --git a/tornasole/trials/trial.py b/tornasole/trials/trial.py index 3e114b2c8b..0a92f340d8 100644 --- a/tornasole/trials/trial.py +++ b/tornasole/trials/trial.py @@ -7,15 +7,15 @@ from tornasole.core.tensor import Tensor, StepState from tornasole.exceptions import * from tornasole.analysis.utils import refresh - -from tornasole.core.locations import EventFileLocation -from tornasole.core.utils import flatten, is_s3, list_collection_files_in_directory, get_worker_name_from_collection_file -from tornasole.core.s3_utils import list_s3_objects, parse_collection_files_from_s3_objects +from tornasole.core.locations import TensorFileLocation +from tornasole.core.utils import flatten, is_s3, \ + list_collection_files_in_directory, get_worker_name_from_collection_file +from tornasole.core.s3_utils import list_s3_objects, \ + parse_collection_files_from_s3_objects from tornasole.core.logger import get_logger from tornasole.core.reductions import TORNASOLE_REDUCTIONS_PREFIX, \ reverse_reduction_tensor_name from tornasole.core.modes import ModeKeys - from tornasole.core.locations import TensorLocation from tornasole.core import index_reader @@ -23,7 +23,7 @@ class EventFileTensor: def __init__(self, filename, tensor_name, step_num, tensor_value, mode=None, mode_step=None, worker=None): - self.location = EventFileLocation.load_filename(filename) + self.location = TensorFileLocation.load_filename(filename) self.tensorname = tensor_name self.tensor_value = tensor_value self.step_num = step_num diff --git a/tornasole/xgboost/hook.py b/tornasole/xgboost/hook.py index 19ab684fae..9f19fc9304 100644 --- a/tornasole/xgboost/hook.py +++ b/tornasole/xgboost/hook.py @@ -7,6 +7,7 @@ from tornasole.core.collection import Collection, CollectionKeys from tornasole.core.save_config import SaveConfig from tornasole.core.hook import CallbackHook +from tornasole.core.tfevent.util import make_numpy_array from tornasole.core.access_layer.utils import training_has_ended from tornasole.core.json_config import create_hook_from_json_config @@ -114,7 +115,11 @@ def _is_last_step(self, env: CallbackEnv) -> bool: return env.iteration + 1 == env.end_iteration def _is_collection_being_saved_for_step(self, name): - return self.collection_manager.get(name) in self.collections_in_this_step + return self.collection_manager.get(name) in self._get_collections_to_save_for_step() + + def _increment_step(self, iteration): + self.step = self.mode_steps[self.mode] = iteration + self._collections_to_save_for_step = None def _callback(self, env: CallbackEnv) -> None: # env.rank: rabit rank of the node/process. master node has rank 0. @@ -133,9 +138,9 @@ def _callback(self, env: CallbackEnv) -> None: self.export_collections() self.exported_collections = True - self.step = self.mode_steps[self.mode] = env.iteration + self._increment_step(env.iteration) - if not self._is_last_step(env) and not self._process_step(): + if not self._is_last_step(env) and not self._get_collections_to_save_for_step(): self.logger.debug("Skipping iteration {}".format(self.step)) return @@ -162,7 +167,7 @@ def _callback(self, env: CallbackEnv) -> None: self.write_average_shap(env) if not self._is_last_step(env): - self._flush_and_close_writer() + self._close_writer() if self._is_last_step(env): self._cleanup() @@ -172,13 +177,13 @@ def _callback(self, env: CallbackEnv) -> None: def write_metrics(self, env: CallbackEnv): # Get metrics measured at current boosting round for metric_name, metric_data in env.evaluation_result_list: - self._write_tensor(metric_name, metric_data) + self._save_for_tensor(metric_name, metric_data) def write_predictions(self, env: CallbackEnv): # Write predictions y_hat from validation data if not self.validation_data: return - self._write_tensor( + self._save_for_tensor( "predictions", env.model.predict(self.validation_data)) @@ -186,7 +191,7 @@ def write_labels(self, env: CallbackEnv): # Write labels y from validation data if not self.validation_data: return - self._write_tensor("labels", self.validation_data.get_label()) + self._save_for_tensor("labels", self.validation_data.get_label()) def write_feature_importances(self, env: CallbackEnv): # Get normalized feature importances (fraction of splits made in each @@ -196,7 +201,7 @@ def write_feature_importances(self, env: CallbackEnv): for feature_name in feature_importances: feature_data = feature_importances[feature_name] / total - self._write_tensor( + self._save_for_tensor( "{}/feature_importance".format(feature_name), feature_data) def write_average_shap(self, env: CallbackEnv): @@ -212,13 +217,12 @@ def write_average_shap(self, env: CallbackEnv): for feature_id, feature_name in enumerate(feature_names): if shap_avg[feature_id] > 0: - self._write_tensor( + self._save_for_tensor( "{}/average_shap".format(feature_name), shap_avg[feature_id]) - def _write_reductions(self, tensor_name, tensor_value, reduction_config): - # not writing reductions for xgboost - return + def _write_for_tensor(self, tensor_name, tensor_value, save_collections): + self._write_raw_tensor(tensor_name, tensor_value, save_collections) @staticmethod def _get_reduction_of_data(reduction_name, tensor_value, tensor_name, abs): @@ -226,7 +230,7 @@ def _get_reduction_of_data(reduction_name, tensor_value, tensor_name, abs): @staticmethod def _make_numpy_array(tensor_value): - return tensor_value + return make_numpy_array(tensor_value) @staticmethod def _validate_data(