diff --git a/dask_ml/model_selection/_incremental.py b/dask_ml/model_selection/_incremental.py index d1d49f2d8..f0854384e 100644 --- a/dask_ml/model_selection/_incremental.py +++ b/dask_ml/model_selection/_incremental.py @@ -851,6 +851,7 @@ def _get_params(self): return ParameterSampler(self.parameters, self.n_initial_parameters) def _additional_calls(self, info): + # First, have an adaptive algorithm if self.n_initial_parameters == "grid": start = len(ParameterGrid(self.parameters)) else: @@ -883,20 +884,27 @@ def inverse(time): if len(best) == 1: [best] = best return {best: 0} + steps = next_time_step - current_time_step + instructions = {b: steps for b in best} + # Second, stop on plateau if any models have already converged out = {} - for k in best: + for k, steps in instructions.items(): records = info[k] - if self.max_iter and len(records) >= self.max_iter: + current_calls = records[-1]["partial_fit_calls"] + if self.max_iter and current_calls >= self.max_iter: out[k] = 0 - elif self.patience and len(records) >= self.patience: - old = records[-self.patience]["score"] - if all(d["score"] < old + self.tol for d in records[-self.patience :]): + elif self.patience and current_calls >= self.patience: + plateau = [ + h["score"] + for h in records + if current_calls - h["partial_fit_calls"] <= self.patience + ] + if all(score <= plateau[0] + self.tol for score in plateau[1:]): out[k] = 0 else: - out[k] = next_time_step - current_time_step + out[k] = steps else: - out[k] = next_time_step - current_time_step - + out[k] = steps return out diff --git a/tests/model_selection/test_incremental.py b/tests/model_selection/test_incremental.py index 9b174eaf6..c9600c92e 100644 --- a/tests/model_selection/test_incremental.py +++ b/tests/model_selection/test_incremental.py @@ -4,6 +4,7 @@ import toolz from dask.distributed import Future from distributed.utils_test import cluster, gen_cluster, loop # noqa: F401 +from sklearn.base import BaseEstimator from sklearn.cluster import MiniBatchKMeans from sklearn.linear_model import SGDClassifier from sklearn.model_selection import ParameterGrid, ParameterSampler @@ -194,12 +195,20 @@ def additional_calls(scores): @gen_cluster(client=True) def test_search_basic(c, s, a, b): + for decay_rate in {0, 1}: + yield _test_search_basic(decay_rate, c, s, a, b) + + +@gen.coroutine +def _test_search_basic(decay_rate, c, s, a, b): X, y = make_classification(n_samples=1000, n_features=5, chunks=(100, 5)) model = SGDClassifier(tol=1e-3, loss="log", penalty="elasticnet") params = {"alpha": np.logspace(-2, 2, 100), "l1_ratio": np.linspace(0.01, 1, 200)} - search = IncrementalSearchCV(model, params, n_initial_parameters=20, max_iter=10) + search = IncrementalSearchCV( + model, params, n_initial_parameters=20, max_iter=10, decay_rate=decay_rate + ) yield search.fit(X, y, classes=[0, 1]) assert search.history_ @@ -225,8 +234,15 @@ def test_search_basic(c, s, a, b): }.issubset(set(search.cv_results_.keys())) assert all(isinstance(v, np.ndarray) for v in search.cv_results_.values()) - assert all(search.cv_results_["test_score"] >= 0) - assert all(search.cv_results_["rank_test_score"] >= 1) + if decay_rate == 0: + assert ( + search.cv_results_["test_score"][search.best_index_] + >= search.cv_results_["test_score"] + ).all() + assert search.cv_results_["rank_test_score"][search.best_index_] == 1 + else: + assert all(search.cv_results_["test_score"] >= 0) + assert all(search.cv_results_["rank_test_score"] >= 1) assert all(search.cv_results_["partial_fit_calls"] >= 1) assert len(np.unique(search.cv_results_["model_id"])) == len( search.cv_results_["model_id"] @@ -252,16 +268,19 @@ def test_search_basic(c, s, a, b): @gen_cluster(client=True, timeout=None) -def test_search_patience(c, s, a, b): +def test_search_plateau_patience(c, s, a, b): X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5)) class ConstantClassifier(SGDClassifier): - def score(*args, **kwargs): - return 0.5 + def __init__(self, value=0): + self.value = value + super(ConstantClassifier, self).__init__(tol=1e-3) - model = ConstantClassifier(tol=1e-3) + def score(self, *args, **kwargs): + return self.value - params = {"alpha": np.logspace(-2, 10, 100), "l1_ratio": np.linspace(0.01, 1, 200)} + params = {"value": np.random.rand(10)} + model = ConstantClassifier() search = IncrementalSearchCV( model, params, n_initial_parameters=10, patience=5, tol=0, max_iter=10 @@ -270,10 +289,11 @@ def score(*args, **kwargs): assert search.history_ for h in search.history_: - assert isinstance(search.best_estimator_, SGDClassifier) - assert search.best_score_ > 0 - assert "visualize" not in search.__dict__ - assert h["partial_fit_calls"] <= 3 + assert h["partial_fit_calls"] <= 5 + assert isinstance(search.best_estimator_, SGDClassifier) + assert search.best_score_ == params["value"].max() == search.best_estimator_.value + assert "visualize" not in search.__dict__ + assert search.best_score_ > 0 X_test, y_test = yield c.compute([X, y]) @@ -281,6 +301,45 @@ def score(*args, **kwargs): search.score(X_test, y_test) +@gen_cluster(client=True, timeout=None) +def test_search_plateau_tol(c, s, a, b): + class LinearFunction(BaseEstimator): + def __init__(self, intercept=0, slope=1, foo=0): + self._num_calls = 0 + self.intercept = intercept + self.slope = slope + super(LinearFunction, self).__init__() + + def fit(self, *args): + return self + + def partial_fit(self, *args, **kwargs): + self._num_calls += 1 + return self + + def score(self, *args, **kwargs): + return self.intercept + self.slope * self._num_calls + + model = LinearFunction(slope=1) + params = {"foo": np.linspace(0, 1)} + + # every 3 calls, score will increase by 3. tol=1: model did improved enough + search = IncrementalSearchCV( + model, params, patience=3, tol=1, max_iter=10, decay_rate=0 + ) + X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5)) + yield search.fit(X, y) + assert set(search.cv_results_["partial_fit_calls"]) == {10} + + # Every 3 calls, score increases by 3. tol=4: model didn't improve enough + search = IncrementalSearchCV( + model, params, patience=3, tol=4, decay_rate=0, max_iter=10 + ) + X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5)) + yield search.fit(X, y) + assert set(search.cv_results_["partial_fit_calls"]) == {3} + + @gen_cluster(client=True) def test_search_max_iter(c, s, a, b): X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5))