From 503057132d8945defaa87891011742d6e1f75bbf Mon Sep 17 00:00:00 2001 From: Zachary Mueller Date: Thu, 28 Jul 2022 11:56:00 -0400 Subject: [PATCH] Skip and raise NotImplementedError for gather_for_metrics for now (#580) * Skip and raise NotImplementedError for now --- examples/by_feature/checkpointing.py | 11 ++++++++++- examples/by_feature/cross_validation.py | 12 +++++++++++- examples/by_feature/fsdp_with_peak_mem_tracking.py | 11 ++++++++++- examples/by_feature/gradient_accumulation.py | 12 +++++++++++- examples/by_feature/memory.py | 12 +++++++++++- examples/by_feature/multi_process_metrics.py | 2 -- examples/by_feature/tracking.py | 11 ++++++++++- examples/complete_cv_example.py | 14 ++++++++++++-- examples/complete_nlp_example.py | 11 ++++++++++- examples/cv_example.py | 14 ++++++++++++-- examples/nlp_example.py | 12 +++++++++++- src/accelerate/accelerator.py | 3 +++ src/accelerate/test_utils/__init__.py | 1 + src/accelerate/test_utils/testing.py | 5 +++++ tests/test_metrics.py | 2 ++ 15 files changed, 119 insertions(+), 14 deletions(-) diff --git a/examples/by_feature/checkpointing.py b/examples/by_feature/checkpointing.py index 3860875e98a..5d31dd8f433 100644 --- a/examples/by_feature/checkpointing.py +++ b/examples/by_feature/checkpointing.py @@ -234,6 +234,7 @@ def training_function(config, args): accelerator.save_state(output_dir) model.eval() + samples_seen = 0 for step, batch in enumerate(eval_dataloader): # We could avoid this line since we set the accelerator with `device_placement=True` (the default). batch.to(accelerator.device) @@ -241,7 +242,15 @@ def training_function(config, args): outputs = model(**batch) predictions = outputs.logits.argmax(dim=-1) # It is slightly faster to call this once, than multiple times - predictions, references = accelerator.gather_for_metrics((predictions, batch["labels"])) + predictions, references = accelerator.gather((predictions, batch["labels"])) + if accelerator.use_distributed: + if step == len(eval_dataloader) - 1: + # Last batch needs to be truncated on distributed systems as it contains additional samples + predictions = predictions[: len(eval_dataloader.dataset) - samples_seen] + references = references[: len(eval_dataloader.dataset) - samples_seen] + else: + # Otherwise we add the number of samples seen + samples_seen += references.shape[0] metric.add_batch( predictions=predictions, references=references, diff --git a/examples/by_feature/cross_validation.py b/examples/by_feature/cross_validation.py index cd5b2fd027c..dc41b91e571 100644 --- a/examples/by_feature/cross_validation.py +++ b/examples/by_feature/cross_validation.py @@ -201,13 +201,23 @@ def training_function(config, args): optimizer.zero_grad() model.eval() + samples_seen = 0 for step, batch in enumerate(eval_dataloader): # We could avoid this line since we set the accelerator with `device_placement=True`. batch.to(accelerator.device) with torch.no_grad(): outputs = model(**batch) predictions = outputs.logits.argmax(dim=-1) - predictions, references = accelerator.gather_for_metrics((predictions, batch["labels"])) + # It is slightly faster to call this once, than multiple times + predictions, references = accelerator.gather((predictions, batch["labels"])) + if accelerator.use_distributed: + if step == len(eval_dataloader) - 1: + # Last batch needs to be truncated on distributed systems as it contains additional samples + predictions = predictions[: len(eval_dataloader.dataset) - samples_seen] + references = references[: len(eval_dataloader.dataset) - samples_seen] + else: + # Otherwise we add the number of samples seen + samples_seen += references.shape[0] metric.add_batch( predictions=predictions, references=references, diff --git a/examples/by_feature/fsdp_with_peak_mem_tracking.py b/examples/by_feature/fsdp_with_peak_mem_tracking.py index 1426dd0cd0f..b95fb9908cd 100644 --- a/examples/by_feature/fsdp_with_peak_mem_tracking.py +++ b/examples/by_feature/fsdp_with_peak_mem_tracking.py @@ -272,6 +272,7 @@ def collate_fn(examples): # context manager to track the peak memory usage during the evaluation with TorchTracemalloc() as tracemalloc: model.eval() + samples_seen = 0 for step, batch in enumerate(eval_dataloader): # We could avoid this line since we set the accelerator with `device_placement=True`. batch.to(accelerator.device) @@ -279,7 +280,15 @@ def collate_fn(examples): outputs = model(**batch) predictions = outputs.logits.argmax(dim=-1) # It is slightly faster to call this once, than multiple times - predictions, references = accelerator.gather_for_metrics((predictions, batch["labels"])) + predictions, references = accelerator.gather((predictions, batch["labels"])) + if accelerator.use_distributed: + if step == len(eval_dataloader) - 1: + # Last batch needs to be truncated on distributed systems as it contains additional samples + predictions = predictions[: len(eval_dataloader.dataset) - samples_seen] + references = references[: len(eval_dataloader.dataset) - samples_seen] + else: + # Otherwise we add the number of samples seen + samples_seen += references.shape[0] metric.add_batch( predictions=predictions, references=references, diff --git a/examples/by_feature/gradient_accumulation.py b/examples/by_feature/gradient_accumulation.py index 2e6de6a3269..580d07a57a0 100644 --- a/examples/by_feature/gradient_accumulation.py +++ b/examples/by_feature/gradient_accumulation.py @@ -168,13 +168,23 @@ def training_function(config, args): optimizer.zero_grad() model.eval() + samples_seen = 0 for step, batch in enumerate(eval_dataloader): # We could avoid this line since we set the accelerator with `device_placement=True`. batch.to(accelerator.device) with torch.no_grad(): outputs = model(**batch) predictions = outputs.logits.argmax(dim=-1) - predictions, references = accelerator.gather_for_metrics((predictions, batch["labels"])) + # It is slightly faster to call this once, than multiple times + predictions, references = accelerator.gather((predictions, batch["labels"])) + if accelerator.use_distributed: + if step == len(eval_dataloader) - 1: + # Last batch needs to be truncated on distributed systems as it contains additional samples + predictions = predictions[: len(eval_dataloader.dataset) - samples_seen] + references = references[: len(eval_dataloader.dataset) - samples_seen] + else: + # Otherwise we add the number of samples seen + samples_seen += references.shape[0] metric.add_batch( predictions=predictions, references=references, diff --git a/examples/by_feature/memory.py b/examples/by_feature/memory.py index 6f7cf00fda0..d85319552aa 100644 --- a/examples/by_feature/memory.py +++ b/examples/by_feature/memory.py @@ -180,13 +180,23 @@ def inner_training_loop(batch_size): optimizer.zero_grad() model.eval() + samples_seen = 0 for step, batch in enumerate(eval_dataloader): # We could avoid this line since we set the accelerator with `device_placement=True`. batch.to(accelerator.device) with torch.no_grad(): outputs = model(**batch) predictions = outputs.logits.argmax(dim=-1) - predictions, references = accelerator.gather_for_metrics((predictions, batch["labels"])) + # It is slightly faster to call this once, than multiple times + predictions, references = accelerator.gather((predictions, batch["labels"])) + if accelerator.use_distributed: + if step == len(eval_dataloader) - 1: + # Last batch needs to be truncated on distributed systems as it contains additional samples + predictions = predictions[: len(eval_dataloader.dataset) - samples_seen] + references = references[: len(eval_dataloader.dataset) - samples_seen] + else: + # Otherwise we add the number of samples seen + samples_seen += references.shape[0] metric.add_batch( predictions=predictions, references=references, diff --git a/examples/by_feature/multi_process_metrics.py b/examples/by_feature/multi_process_metrics.py index f39b064d6c0..cf581c73d17 100644 --- a/examples/by_feature/multi_process_metrics.py +++ b/examples/by_feature/multi_process_metrics.py @@ -190,8 +190,6 @@ def training_function(config, args): else: # Otherwise we add the number of samples seen samples_seen += references.shape[0] - # All of this can be avoided if you use `Accelerator.gather_for_metrics` instead of `Accelerator.gather`: - # accelerator.gather_for_metrics((predictions, references)) metric.add_batch( predictions=predictions, references=references, diff --git a/examples/by_feature/tracking.py b/examples/by_feature/tracking.py index d8c985e889e..0da8c437ab9 100644 --- a/examples/by_feature/tracking.py +++ b/examples/by_feature/tracking.py @@ -192,6 +192,7 @@ def training_function(config, args): optimizer.zero_grad() model.eval() + samples_seen = 0 for step, batch in enumerate(eval_dataloader): # We could avoid this line since we set the accelerator with `device_placement=True` (the default). batch.to(accelerator.device) @@ -199,7 +200,15 @@ def training_function(config, args): outputs = model(**batch) predictions = outputs.logits.argmax(dim=-1) # It is slightly faster to call this once, than multiple times - predictions, references = accelerator.gather_for_metrics((predictions, batch["labels"])) + predictions, references = accelerator.gather((predictions, batch["labels"])) + if accelerator.use_distributed: + if step == len(eval_dataloader) - 1: + # Last batch needs to be truncated on distributed systems as it contains additional samples + predictions = predictions[: len(eval_dataloader.dataset) - samples_seen] + references = references[: len(eval_dataloader.dataset) - samples_seen] + else: + # Otherwise we add the number of samples seen + samples_seen += references.shape[0] metric.add_batch( predictions=predictions, references=references, diff --git a/examples/complete_cv_example.py b/examples/complete_cv_example.py index ee051a0c5f1..f984406bfcd 100644 --- a/examples/complete_cv_example.py +++ b/examples/complete_cv_example.py @@ -232,6 +232,7 @@ def training_function(config, args): accelerator.save_state(output_dir) model.eval() accurate = 0 + samples_seen = 0 for step, batch in enumerate(eval_dataloader): # We could avoid this line since we set the accelerator with `device_placement=True`. batch = {k: v.to(accelerator.device) for k, v in batch.items()} @@ -239,8 +240,17 @@ def training_function(config, args): with torch.no_grad(): outputs = model(inputs) predictions = outputs.argmax(dim=-1) - predictions, labels = accelerator.gather_for_metrics((predictions, batch["label"])) - accurate_preds = predictions == labels + # It is slightly faster to call this once, than multiple times + predictions, references = accelerator.gather((predictions, batch["label"])) + if accelerator.use_distributed: + if step == len(eval_dataloader) - 1: + # Last batch needs to be truncated on distributed systems as it contains additional samples + predictions = predictions[: len(eval_dataloader.dataset) - samples_seen] + references = references[: len(eval_dataloader.dataset) - samples_seen] + else: + # Otherwise we add the number of samples seen + samples_seen += references.shape[0] + accurate_preds = predictions == references accurate += accurate_preds.long().sum() eval_metric = accurate.item() / accelerator.gradient_state.samples_seen diff --git a/examples/complete_nlp_example.py b/examples/complete_nlp_example.py index 88d379c8e36..fc0fae90ba6 100644 --- a/examples/complete_nlp_example.py +++ b/examples/complete_nlp_example.py @@ -210,6 +210,7 @@ def collate_fn(examples): accelerator.save_state(output_dir) model.eval() + samples_seen = 0 for step, batch in enumerate(eval_dataloader): # We could avoid this line since we set the accelerator with `device_placement=True`. batch.to(accelerator.device) @@ -217,7 +218,15 @@ def collate_fn(examples): outputs = model(**batch) predictions = outputs.logits.argmax(dim=-1) # It is slightly faster to call this once, than multiple times - predictions, references = accelerator.gather_for_metrics((predictions, batch["labels"])) + predictions, references = accelerator.gather((predictions, batch["labels"])) + if accelerator.use_distributed: + if step == len(eval_dataloader) - 1: + # Last batch needs to be truncated on distributed systems as it contains additional samples + predictions = predictions[: len(eval_dataloader.dataset) - samples_seen] + references = references[: len(eval_dataloader.dataset) - samples_seen] + else: + # Otherwise we add the number of samples seen + samples_seen += references.shape[0] metric.add_batch( predictions=predictions, references=references, diff --git a/examples/cv_example.py b/examples/cv_example.py index 6663b2fe9df..875b683987c 100644 --- a/examples/cv_example.py +++ b/examples/cv_example.py @@ -166,6 +166,7 @@ def training_function(config, args): model.eval() accurate = 0 num_elems = 0 + samples_seen = 0 for _, batch in enumerate(eval_dataloader): # We could avoid this line since we set the accelerator with `device_placement=True`. batch = {k: v.to(accelerator.device) for k, v in batch.items()} @@ -173,8 +174,17 @@ def training_function(config, args): with torch.no_grad(): outputs = model(inputs) predictions = outputs.argmax(dim=-1) - predictions, labels = accelerator.gather_for_metrics((predictions, batch["label"])) - accurate_preds = predictions == labels + # It is slightly faster to call this once, than multiple times + predictions, references = accelerator.gather((predictions, batch["label"])) + if accelerator.use_distributed: + if step == len(eval_dataloader) - 1: + # Last batch needs to be truncated on distributed systems as it contains additional samples + predictions = predictions[: len(eval_dataloader.dataset) - samples_seen] + references = references[: len(eval_dataloader.dataset) - samples_seen] + else: + # Otherwise we add the number of samples seen + samples_seen += references.shape[0] + accurate_preds = predictions == references num_elems += accurate_preds.shape[0] accurate += accurate_preds.long().sum() diff --git a/examples/nlp_example.py b/examples/nlp_example.py index 2ef18598be0..33c0ed7aa4c 100644 --- a/examples/nlp_example.py +++ b/examples/nlp_example.py @@ -152,13 +152,23 @@ def training_function(config, args): optimizer.zero_grad() model.eval() + samples_seen = 0 for step, batch in enumerate(eval_dataloader): # We could avoid this line since we set the accelerator with `device_placement=True`. batch.to(accelerator.device) with torch.no_grad(): outputs = model(**batch) predictions = outputs.logits.argmax(dim=-1) - predictions, references = accelerator.gather_for_metrics((predictions, batch["labels"])) + # It is slightly faster to call this once, than multiple times + predictions, references = accelerator.gather((predictions, batch["labels"])) + if accelerator.use_distributed: + if step == len(eval_dataloader) - 1: + # Last batch needs to be truncated on distributed systems as it contains additional samples + predictions = predictions[: len(eval_dataloader.dataset) - samples_seen] + references = references[: len(eval_dataloader.dataset) - samples_seen] + else: + # Otherwise we add the number of samples seen + samples_seen += references.shape[0] metric.add_batch( predictions=predictions, references=references, diff --git a/src/accelerate/accelerator.py b/src/accelerate/accelerator.py index 977cd5d24cd..5a9a55c6873 100644 --- a/src/accelerate/accelerator.py +++ b/src/accelerate/accelerator.py @@ -943,6 +943,9 @@ def gather_for_metrics(self, tensor): tensor (`torch.Tensor`, or a nested tuple/list/dictionary of `torch.Tensor`): The tensors for calculating metrics across all processes. """ + raise NotImplementedError( + "Currently there are a number of bugs with this method. You should use `Accelerator.gather()` and drop the samples yourself for the time being." + ) tensor = self.gather(tensor) if self.use_distributed: if self.gradient_state.remainder == -1: diff --git a/src/accelerate/test_utils/__init__.py b/src/accelerate/test_utils/__init__.py index d65099602c7..382bf81bf55 100644 --- a/src/accelerate/test_utils/__init__.py +++ b/src/accelerate/test_utils/__init__.py @@ -10,6 +10,7 @@ require_multi_gpu, require_single_gpu, require_tpu, + skip, slow, ) from .training import RegressionDataset, RegressionModel diff --git a/src/accelerate/test_utils/testing.py b/src/accelerate/test_utils/testing.py index b5cccc1ff6a..145cc80b24c 100644 --- a/src/accelerate/test_utils/testing.py +++ b/src/accelerate/test_utils/testing.py @@ -57,6 +57,11 @@ def parse_flag_from_env(key, default=False): _run_slow_tests = parse_flag_from_env("RUN_SLOW", default=False) +def skip(test_case): + "Decorator that skips a test unconditionally" + return unittest.skip("Test was skipped")(test_case) + + def slow(test_case): """ Decorator marking a test as slow. Slow tests are skipped by default. Set the RUN_SLOW environment variable to a diff --git a/tests/test_metrics.py b/tests/test_metrics.py index c744a2fdb0b..7e42c793e9b 100644 --- a/tests/test_metrics.py +++ b/tests/test_metrics.py @@ -25,11 +25,13 @@ require_cpu, require_multi_gpu, require_single_gpu, + skip, test_metrics, ) from accelerate.utils import get_launch_prefix, patch_environment +@skip class MetricTester(unittest.TestCase): def setUp(self): mod_file = inspect.getfile(accelerate.test_utils)