From baedfb1132066a46c14189a9a41515a2566e9d18 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Sat, 30 Mar 2024 23:34:16 +0800 Subject: [PATCH 1/2] auto batch size supports methods returns a dict --- deepmd/pt/utils/auto_batch_size.py | 74 ++++++++++++++++++++----- source/tests/pt/test_auto_batch_size.py | 37 +++++++++++++ 2 files changed, 96 insertions(+), 15 deletions(-) create mode 100644 source/tests/pt/test_auto_batch_size.py diff --git a/deepmd/pt/utils/auto_batch_size.py b/deepmd/pt/utils/auto_batch_size.py index 181d56f2f4..eec664db2d 100644 --- a/deepmd/pt/utils/auto_batch_size.py +++ b/deepmd/pt/utils/auto_batch_size.py @@ -12,6 +12,32 @@ class AutoBatchSize(AutoBatchSizeBase): + """Auto batch size. + + Parameters + ---------- + initial_batch_size : int, default: 1024 + initial batch size (number of total atoms) when DP_INFER_BATCH_SIZE + is not set + factor : float, default: 2. + increased factor + returned_dict: + if the batched method returns a dict of arrays. + + """ + + def __init__( + self, + initial_batch_size: int = 1024, + factor: float = 2.0, + returned_dict: bool = False, + ): + super().__init__( + initial_batch_size=initial_batch_size, + factor=factor, + ) + self.returned_dict = returned_dict + def is_gpu_available(self) -> bool: """Check if GPU is available. @@ -78,26 +104,44 @@ def execute_with_batch_size( ) index = 0 - results = [] + results = None while index < total_size: n_batch, result = self.execute(execute_with_batch_size, index, natoms) - if not isinstance(result, tuple): - result = (result,) + if not self.returned_dict: + result = (result,) if not isinstance(result, tuple) else result index += n_batch - if n_batch: - for rr in result: - rr.reshape((n_batch, -1)) - results.append(result) - r_list = [] - for r in zip(*results): + + def append_to_list(res_list, res): + if n_batch: + res_list.append(res) + return res_list + + if not self.returned_dict: + results = [] if results is None else results + results = append_to_list(results, result) + else: + results = ( + {kk: [] for kk in result.keys()} if results is None else results + ) + results = { + kk: append_to_list(results[kk], result[kk]) for kk in result.keys() + } + + def concate_result(r): if isinstance(r[0], np.ndarray): - r_list.append(np.concatenate(r, axis=0)) + ret = np.concatenate(r, axis=0) elif isinstance(r[0], torch.Tensor): - r_list.append(torch.cat(r, dim=0)) + ret = torch.cat(r, dim=0) else: raise RuntimeError(f"Unexpected result type {type(r[0])}") - r = tuple(r_list) - if len(r) == 1: - # avoid returning tuple if callable doesn't return tuple - r = r[0] + return ret + + if not self.returned_dict: + r_list = [concate_result(r) for r in zip(*results)] + r = tuple(r_list) + if len(r) == 1: + # avoid returning tuple if callable doesn't return tuple + r = r[0] + else: + r = {kk: concate_result(vv) for kk, vv in results.items()} return r diff --git a/source/tests/pt/test_auto_batch_size.py b/source/tests/pt/test_auto_batch_size.py new file mode 100644 index 0000000000..0d2f5a483e --- /dev/null +++ b/source/tests/pt/test_auto_batch_size.py @@ -0,0 +1,37 @@ +# SPDX-License-Identifier: LGPL-3.0-or-later +import unittest + +import numpy as np + +from deepmd.pt.utils.auto_batch_size import ( + AutoBatchSize, +) + + +class TestAutoBatchSize(unittest.TestCase): + def test_execute_all(self): + dd0 = np.zeros((10000, 2, 1, 3, 4)) + dd1 = np.ones((10000, 2, 1, 3, 4)) + auto_batch_size = AutoBatchSize(256, 2.0) + + def func(dd1): + return np.zeros_like(dd1), np.ones_like(dd1) + + dd2 = auto_batch_size.execute_all(func, 10000, 2, dd1) + np.testing.assert_equal(dd0, dd2[0]) + np.testing.assert_equal(dd1, dd2[1]) + + def test_execute_all_dict(self): + dd0 = np.zeros((10000, 2, 1, 3, 4)) + dd1 = np.ones((10000, 2, 1, 3, 4)) + auto_batch_size = AutoBatchSize(256, 2.0, returned_dict=True) + + def func(dd1): + return { + "foo": np.zeros_like(dd1), + "bar": np.ones_like(dd1), + } + + dd2 = auto_batch_size.execute_all(func, 10000, 2, dd1) + np.testing.assert_equal(dd0, dd2["foo"]) + np.testing.assert_equal(dd1, dd2["bar"]) From 3c4ff2c1c34675c93b853902fd6b0d699034479a Mon Sep 17 00:00:00 2001 From: Han Wang Date: Sun, 31 Mar 2024 12:28:19 +0800 Subject: [PATCH 2/2] check if a dict is returned at runtime. --- deepmd/pt/utils/auto_batch_size.py | 16 +++++++++------- source/tests/pt/test_auto_batch_size.py | 2 +- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/deepmd/pt/utils/auto_batch_size.py b/deepmd/pt/utils/auto_batch_size.py index eec664db2d..13264a336c 100644 --- a/deepmd/pt/utils/auto_batch_size.py +++ b/deepmd/pt/utils/auto_batch_size.py @@ -21,8 +21,6 @@ class AutoBatchSize(AutoBatchSizeBase): is not set factor : float, default: 2. increased factor - returned_dict: - if the batched method returns a dict of arrays. """ @@ -30,13 +28,11 @@ def __init__( self, initial_batch_size: int = 1024, factor: float = 2.0, - returned_dict: bool = False, ): super().__init__( initial_batch_size=initial_batch_size, factor=factor, ) - self.returned_dict = returned_dict def is_gpu_available(self) -> bool: """Check if GPU is available. @@ -105,9 +101,13 @@ def execute_with_batch_size( index = 0 results = None + returned_dict = None while index < total_size: n_batch, result = self.execute(execute_with_batch_size, index, natoms) - if not self.returned_dict: + returned_dict = ( + isinstance(result, dict) if returned_dict is None else returned_dict + ) + if not returned_dict: result = (result,) if not isinstance(result, tuple) else result index += n_batch @@ -116,7 +116,7 @@ def append_to_list(res_list, res): res_list.append(res) return res_list - if not self.returned_dict: + if not returned_dict: results = [] if results is None else results results = append_to_list(results, result) else: @@ -126,6 +126,8 @@ def append_to_list(res_list, res): results = { kk: append_to_list(results[kk], result[kk]) for kk in result.keys() } + assert results is not None + assert returned_dict is not None def concate_result(r): if isinstance(r[0], np.ndarray): @@ -136,7 +138,7 @@ def concate_result(r): raise RuntimeError(f"Unexpected result type {type(r[0])}") return ret - if not self.returned_dict: + if not returned_dict: r_list = [concate_result(r) for r in zip(*results)] r = tuple(r_list) if len(r) == 1: diff --git a/source/tests/pt/test_auto_batch_size.py b/source/tests/pt/test_auto_batch_size.py index 0d2f5a483e..71194e001e 100644 --- a/source/tests/pt/test_auto_batch_size.py +++ b/source/tests/pt/test_auto_batch_size.py @@ -24,7 +24,7 @@ def func(dd1): def test_execute_all_dict(self): dd0 = np.zeros((10000, 2, 1, 3, 4)) dd1 = np.ones((10000, 2, 1, 3, 4)) - auto_batch_size = AutoBatchSize(256, 2.0, returned_dict=True) + auto_batch_size = AutoBatchSize(256, 2.0) def func(dd1): return {