From 8005b7553c47e64dadec25a5c99c1c8777c766ef Mon Sep 17 00:00:00 2001 From: Scott Sievert Date: Sat, 13 Oct 2018 11:03:56 -0500 Subject: [PATCH 01/14] API: rename IncrementalSearch => IncrementalSearchCV and change it in the docs too --- dask_ml/model_selection/__init__.py | 4 ++-- dask_ml/model_selection/_incremental.py | 16 ++++++++-------- docs/source/hyper-parameter-search.rst | 10 +++++----- tests/model_selection/test_incremental.py | 16 +++++++++------- 4 files changed, 24 insertions(+), 22 deletions(-) diff --git a/dask_ml/model_selection/__init__.py b/dask_ml/model_selection/__init__.py index d7e9633aa..ef9122666 100644 --- a/dask_ml/model_selection/__init__.py +++ b/dask_ml/model_selection/__init__.py @@ -19,8 +19,8 @@ try: - from ._incremental import IncrementalSearch # noqa: F401 + from ._incremental import IncrementalSearchCV # noqa: F401 - __all__.extend(["IncrementalSearch"]) + __all__.extend(["IncrementalSearchCV"]) except ImportError: pass diff --git a/dask_ml/model_selection/_incremental.py b/dask_ml/model_selection/_incremental.py index b733c4918..f25ed8f50 100644 --- a/dask_ml/model_selection/_incremental.py +++ b/dask_ml/model_selection/_incremental.py @@ -396,7 +396,7 @@ def fit( # ---------------------------------------------------------------------------- -class BaseIncrementalSearch(BaseEstimator, MetaEstimatorMixin): +class BaseIncrementalSearchCV(BaseEstimator, MetaEstimatorMixin): """Base class for estimators using the incremental `fit`. Subclasses must implement the following abstract method @@ -599,7 +599,7 @@ def score(self, X, y=None): return self.scorer_(self.best_estimator_, X, y) -class IncrementalSearch(BaseIncrementalSearch): +class IncrementalSearchCV(BaseIncrementalSearchCV): """ Incrementally search for hyper-parameters on models that support partial_fit @@ -715,16 +715,16 @@ class IncrementalSearch(BaseIncrementalSearch): ... 'l1_ratio': np.linspace(0, 1, num=1000), ... 'average': [True, False]} - >>> search = IncrementalSearch(model, params, random_state=0) + >>> search = IncrementalSearchCV(model, params, random_state=0) >>> search.fit(X, y, classes=[0, 1]) - IncrementalSearch(...) + IncrementalSearchCV(...) Alternatively you can provide keywords to start with more hyper-parameters, but stop those that don't seem to improve with more data. - >>> search = IncrementalSearch(model, params, random_state=0, - ... n_initial_parameters=1000, - ... patience=20, max_iter=100) + >>> search = IncrementalSearchCV(model, params, random_state=0, + ... n_initial_parameters=1000, + ... patience=20, max_iter=100) """ def __init__( @@ -747,7 +747,7 @@ def __init__( self.tol = tol self.scores_per_fit = scores_per_fit self.max_iter = max_iter - super(IncrementalSearch, self).__init__( + super(IncrementalSearchCV, self).__init__( estimator, param_distribution, test_size, random_state, scoring ) diff --git a/docs/source/hyper-parameter-search.rst b/docs/source/hyper-parameter-search.rst index b1bb289c6..3813c5af7 100644 --- a/docs/source/hyper-parameter-search.rst +++ b/docs/source/hyper-parameter-search.rst @@ -190,7 +190,7 @@ hyperparameter optimization. These should be used when your full dataset doesn't fit in memory on a single machine. .. autosummary:: - dask_ml.model_selection.IncrementalSearch + dask_ml.model_selection.IncrementalSearchCV Broadly speaking, incremental optimization starts with a batch of models (underlying estimators and hyperparameter combinationms) and repeatedly calls the underlying estimator's @@ -201,7 +201,7 @@ estimators and hyperparameter combinationms) and repeatedly calls the underlying These estimators require the optional ``distributed`` library. Here's an example training on a "large" dataset (a Dask array) with the -``IncrementalSearch`` +``IncrementalSearchCV``. .. ipython:: python @@ -234,9 +234,9 @@ train-and-score them until we find the best one. .. ipython:: python - from dask_ml.model_selection import IncrementalSearch + from dask_ml.model_selection import IncrementalSearchCV - search = IncrementalSearch(model, params, random_state=0) + search = IncrementalSearchCV(model, params, random_state=0) search.fit(X, y, classes=[0, 1]) Note that when you do post-fit tasks like ``search.score``, the underlying @@ -256,7 +256,7 @@ to use post-estimation features like scoring or prediction, we recommend using model = ParallelPostFit(SGDClassifier(tol=1e-3, penalty="elasticnet", random_state=0)) - search = IncrementalSearch(model, params, random_state=0) + search = IncrementalSearchCV(model, params, random_state=0) search.fit(X, y, classes=[0, 1]) search.score(X, y) diff --git a/tests/model_selection/test_incremental.py b/tests/model_selection/test_incremental.py index 68563ce34..b53f4e306 100644 --- a/tests/model_selection/test_incremental.py +++ b/tests/model_selection/test_incremental.py @@ -10,7 +10,7 @@ from tornado import gen from dask_ml.datasets import make_classification -from dask_ml.model_selection import IncrementalSearch +from dask_ml.model_selection import IncrementalSearchCV from dask_ml.model_selection._incremental import _partial_fit, _score, fit @@ -194,7 +194,7 @@ def test_search(c, s, a, b): params = {"alpha": np.logspace(-2, 2, 100), "l1_ratio": np.linspace(0.01, 1, 200)} - search = IncrementalSearch(model, params, n_initial_parameters=10, max_iter=10) + search = IncrementalSearchCV(model, params, n_initial_parameters=20, max_iter=10) yield search.fit(X, y, classes=[0, 1]) assert search.history_results_ @@ -226,7 +226,9 @@ def score(*args, **kwargs): params = {"alpha": np.logspace(-2, 10, 100), "l1_ratio": np.linspace(0.01, 1, 200)} - search = IncrementalSearch(model, params, n_initial_parameters=10, patience=2) + search = IncrementalSearchCV( + model, params, n_initial_parameters=10, patience=5, tol=0, max_iter=10 + ) yield search.fit(X, y, classes=[0, 1]) assert search.history_results_ @@ -248,7 +250,7 @@ def test_search_max_iter(c, s, a, b): model = SGDClassifier(tol=1e-3, penalty="elasticnet") params = {"alpha": np.logspace(-2, 10, 10), "l1_ratio": np.linspace(0.01, 1, 20)} - search = IncrementalSearch(model, params, n_initial_parameters=10, max_iter=1) + search = IncrementalSearchCV(model, params, n_initial_parameters=10, max_iter=1) yield search.fit(X, y, classes=[0, 1]) for d in search.history_results_: assert d["partial_fit_calls"] <= 1 @@ -262,7 +264,7 @@ def test_gridsearch(c, s, a, b): params = {"alpha": np.logspace(-2, 10, 3), "l1_ratio": np.linspace(0.01, 1, 2)} - search = IncrementalSearch(model, params, n_initial_parameters="grid") + search = IncrementalSearchCV(model, params, n_initial_parameters="grid") yield search.fit(X, y, classes=[0, 1]) assert {frozenset(d["params"].items()) for d in search.history_results_} == { @@ -277,7 +279,7 @@ def test_numpy_array(c, s, a, b): model = SGDClassifier(tol=1e-3, penalty="elasticnet") params = {"alpha": np.logspace(-2, 10, 10), "l1_ratio": np.linspace(0.01, 1, 20)} - search = IncrementalSearch(model, params, n_initial_parameters=10) + search = IncrementalSearchCV(model, params, n_initial_parameters=10) yield search.fit(X, y, classes=[0, 1]) @@ -286,7 +288,7 @@ def test_transform(c, s, a, b): X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5)) model = MiniBatchKMeans(random_state=0) params = {"n_clusters": [3, 4, 5], "n_init": [1, 2]} - search = IncrementalSearch(model, params, n_initial_parameters="grid") + search = IncrementalSearchCV(model, params, n_initial_parameters="grid") yield search.fit(X, y) X_, = yield c.compute([X]) result = search.transform(X_) From 0335087259929796b8b439bfb80498807290efb0 Mon Sep 17 00:00:00 2001 From: Scott Sievert Date: Sat, 13 Oct 2018 11:22:02 -0500 Subject: [PATCH 02/14] API: rename history_results_, and format differently * Rename history_results_ => history_ * Provide complete model history, and make it public (otherwise boilerplate needed to formulate model_history_ from history_, looping over items in history and putting in dict, {model_id: hist}) --- dask_ml/model_selection/_incremental.py | 34 +++++++--------------- tests/model_selection/test_incremental.py | 35 +++++++++++++---------- 2 files changed, 30 insertions(+), 39 deletions(-) diff --git a/dask_ml/model_selection/_incremental.py b/dask_ml/model_selection/_incremental.py index f25ed8f50..fa18d5e36 100644 --- a/dask_ml/model_selection/_incremental.py +++ b/dask_ml/model_selection/_incremental.py @@ -2,7 +2,7 @@ import itertools import operator -from collections import namedtuple +from collections import defaultdict, namedtuple from copy import deepcopy from time import time @@ -256,6 +256,12 @@ def get_futures(partial_fit_calls): models = {k: client.submit(operator.getitem, v, 0) for k, v in models.items()} yield wait(models) + + info = defaultdict(list) + for h in history: + info[h["model_id"]].append(h) + info = dict(info) + raise gen.Return(Results(info, models, history)) @@ -459,27 +465,6 @@ def _get_params(self): """ return ParameterGrid(self.parameters) - def _get_history_results(self, results): - # type: (Results) -> Dict - """Construct the CV results. - - Has the following keys: - - * params - * test_score - * mean_test_score - * rank_test_score - * mean_partial_fit_time - * std_partial_fit_time - * mean_score_time - * std_score_time - * partial_fit_calls - * model_id - """ - info, model, history = results - key = operator.itemgetter("model_id") - hist2 = sorted(history, key=key) - return hist2 def _get_best(self, results, history_results): # type: (Dict, Dict) -> Estimator @@ -530,9 +515,9 @@ def _fit(self, X, y, **fit_params): random_state=self.random_state, ) results = self._process_results(results) - history_results = self._get_history_results(results) best_estimator, best_index = self._get_best(results, history_results) best_estimator = yield best_estimator + model_history, models, history = results # Clean up models we're hanging onto ids = list(results.models) @@ -540,7 +525,8 @@ def _fit(self, X, y, **fit_params): del results.models[model_id] self.scorer_ = scorer - self.history_results_ = history_results + self.history_ = history + self.model_history_ = model_history self.best_estimator_ = best_estimator self.best_index_ = best_index self.best_score_ = history_results[best_index]["score"] diff --git a/tests/model_selection/test_incremental.py b/tests/model_selection/test_incremental.py index b53f4e306..4f81a7979 100644 --- a/tests/model_selection/test_incremental.py +++ b/tests/model_selection/test_incremental.py @@ -65,14 +65,15 @@ def additional_calls(info): # `<` not `==` because we randomly dropped one model assert len(history) < n_parameters * 10 - for key in { - "partial_fit_time", - "score_time", - "model_id", - "params", - "partial_fit_calls", - }: - assert key in history[0] + for h in history: + assert { + "partial_fit_time", + "score_time", + "score", + "model_id", + "params", + "partial_fit_calls", + }.issubset(set(h.keys())) groups = toolz.groupby("partial_fit_calls", history) assert len(groups[1]) > len(groups[2]) > len(groups[3]) > len(groups[max(groups)]) @@ -178,8 +179,12 @@ def additional_calls(scores): assert meta["params"] == {"alpha": 0.1} assert meta["partial_fit_calls"] == 6 + 1 - assert len(models) == len(info) == 1 + assert len(info) > len(models) == 1 + assert set(models.keys()).issubset(set(info.keys())) assert meta["partial_fit_calls"] == history[-1]["partial_fit_calls"] + calls = {k: [h["partial_fit_calls"] for h in hist] for k, hist in info.items()} + for k, call in calls.items(): + assert (np.diff(call) >= 1).all() assert set(models.keys()) == {0} del models[0] @@ -197,8 +202,8 @@ def test_search(c, s, a, b): search = IncrementalSearchCV(model, params, n_initial_parameters=20, max_iter=10) yield search.fit(X, y, classes=[0, 1]) - assert search.history_results_ - for d in search.history_results_: + assert search.history_ + for d in search.history_: assert d["partial_fit_calls"] <= search.max_iter + 1 assert isinstance(search.best_estimator_, SGDClassifier) assert search.best_score_ > 0 @@ -231,9 +236,9 @@ def score(*args, **kwargs): ) yield search.fit(X, y, classes=[0, 1]) - assert search.history_results_ - for d in search.history_results_: assert d["partial_fit_calls"] <= 3 + assert search.history_ + for h in search.history_: assert isinstance(search.best_estimator_, SGDClassifier) assert search.best_score_ > 0 assert "visualize" not in search.__dict__ @@ -252,7 +257,7 @@ def test_search_max_iter(c, s, a, b): search = IncrementalSearchCV(model, params, n_initial_parameters=10, max_iter=1) yield search.fit(X, y, classes=[0, 1]) - for d in search.history_results_: + for d in search.history_: assert d["partial_fit_calls"] <= 1 @@ -267,7 +272,7 @@ def test_gridsearch(c, s, a, b): search = IncrementalSearchCV(model, params, n_initial_parameters="grid") yield search.fit(X, y, classes=[0, 1]) - assert {frozenset(d["params"].items()) for d in search.history_results_} == { + assert {frozenset(d["params"].items()) for d in search.history_} == { frozenset(d.items()) for d in ParameterGrid(params) } From 66da9374ea77e7678f0cb892d7eb55aaa45fe428 Mon Sep 17 00:00:00 2001 From: Scott Sievert Date: Sat, 13 Oct 2018 11:28:47 -0500 Subject: [PATCH 03/14] API: add cv_results_ This mirrors scikit-learn's cv_results_, with a one important distinction: this implementation only test on 1 training set. This means that there's a `test_score` key, not `mean_test_score`, or `test_score0`. --- dask_ml/model_selection/_incremental.py | 48 ++++++++++++++++++++--- tests/model_selection/test_incremental.py | 22 ++++++++++- 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/dask_ml/model_selection/_incremental.py b/dask_ml/model_selection/_incremental.py index fa18d5e36..625b76579 100644 --- a/dask_ml/model_selection/_incremental.py +++ b/dask_ml/model_selection/_incremental.py @@ -465,6 +465,42 @@ def _get_params(self): """ return ParameterGrid(self.parameters) + def _get_cv_results(self, history, model_hist): + cv_results = {} + best_scores = {} + best_scores = {k: hist[-1]["score"] for k, hist in model_hist.items()} + + cv_results = {} + for k, hist in model_hist.items(): + pf_times = list(toolz.pluck("partial_fit_time", hist)) + score_times = list(toolz.pluck("score_time", hist)) + cv_results[k] = { + "mean_partial_fit_time": np.mean(pf_times), + "mean_score_time": np.mean(score_times), + "std_partial_fit_time": np.std(pf_times), + "std_score_time": np.std(score_times), + "test_score": best_scores[k], + "model_id": k, + "params": hist[0]["params"], + "partial_fit_calls": hist[-1]["partial_fit_calls"], + } + cv_results = list(cv_results.values()) # list of dicts + cv_results = {k: [res[k] for res in cv_results] for k in cv_results[0]} + + # Every model will have the same params because this class uses either + # ParameterSampler or ParameterGrid + cv_results.update( + { + "param_" + k: v + for params in cv_results["params"] + for k, v in params.items() + } + ) + cv_results = {k: np.array(v) for k, v in cv_results.items()} + + order = (-1 * cv_results["test_score"]).argsort() + cv_results["rank_test_score"] = order.argsort() + 1 + return cv_results def _get_best(self, results, history_results): # type: (Dict, Dict) -> Estimator @@ -515,22 +551,24 @@ def _fit(self, X, y, **fit_params): random_state=self.random_state, ) results = self._process_results(results) - best_estimator, best_index = self._get_best(results, history_results) - best_estimator = yield best_estimator model_history, models, history = results + cv_results = self._get_cv_results(history, model_history) + best_estimator = yield best_est + # Clean up models we're hanging onto ids = list(results.models) for model_id in ids: del results.models[model_id] + self.cv_results_ = cv_results self.scorer_ = scorer self.history_ = history self.model_history_ = model_history self.best_estimator_ = best_estimator - self.best_index_ = best_index - self.best_score_ = history_results[best_index]["score"] - self.best_params_ = history_results[best_index]["params"] + self.best_index_ = best_idx + self.best_score_ = cv_results["test_score"][best_idx] + self.best_params_ = cv_results["params"][best_idx] self.n_splits_ = 1 self.multimetric_ = False # TODO: is this always true? raise gen.Return(self) diff --git a/tests/model_selection/test_incremental.py b/tests/model_selection/test_incremental.py index 4f81a7979..94ebbc5d8 100644 --- a/tests/model_selection/test_incremental.py +++ b/tests/model_selection/test_incremental.py @@ -193,7 +193,7 @@ def additional_calls(scores): @gen_cluster(client=True) -def test_search(c, s, a, b): +def test_search_basic(c, s, a, b): X, y = make_classification(n_samples=1000, n_features=5, chunks=(100, 5)) model = SGDClassifier(tol=1e-3, loss="log", penalty="elasticnet") @@ -209,6 +209,26 @@ def test_search(c, s, a, b): assert search.best_score_ > 0 assert "visualize" not in search.__dict__ assert search.best_params_ + assert search.cv_results_ and isinstance(search.cv_results_, dict) + assert { + "mean_partial_fit_time", + "mean_score_time", + "std_partial_fit_time", + "std_score_time", + "test_score", + "rank_test_score", + "model_id", + "params", + "partial_fit_calls", + "param_alpha", + "param_l1_ratio", + }.issubset(set(search.cv_results_.keys())) + assert all(isinstance(v, np.ndarray) for v in search.cv_results_.values()) + assert ( + search.cv_results_["test_score"][search.best_index_] + >= search.cv_results_["test_score"] + ).all() + assert search.cv_results_["rank_test_score"][search.best_index_] == 1 X_, = yield c.compute([X]) proba = search.predict_proba(X_) From 28793deaf98b37ef5d9425c1ec975a11b397e465 Mon Sep 17 00:00:00 2001 From: Scott Sievert Date: Sat, 13 Oct 2018 11:34:13 -0500 Subject: [PATCH 04/14] MAINT: allow _additional_calls to return multiple models Before, BaseIncrementalSearchCV assumed _additional_calls returned one model and returned that to the user. Now, BaseIncrementalSearchCV chooses the model with the highest score returned by _additional_calls. This matters if desired to do a random search, or if `max_iter` is hit. --- dask_ml/model_selection/_incremental.py | 36 +++++++++++++------------ 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/dask_ml/model_selection/_incremental.py b/dask_ml/model_selection/_incremental.py index 625b76579..4965e9282 100644 --- a/dask_ml/model_selection/_incremental.py +++ b/dask_ml/model_selection/_incremental.py @@ -1,6 +1,5 @@ from __future__ import division -import itertools import operator from collections import defaultdict, namedtuple from copy import deepcopy @@ -18,7 +17,6 @@ from sklearn.utils import check_random_state from sklearn.utils.metaestimators import if_delegate_has_method from sklearn.utils.validation import check_is_fitted -from toolz import first from tornado import gen from ..utils import check_array @@ -502,21 +500,24 @@ def _get_cv_results(self, history, model_hist): cv_results["rank_test_score"] = order.argsort() + 1 return cv_results - def _get_best(self, results, history_results): - # type: (Dict, Dict) -> Estimator - """Select the best estimator from the set of estimators.""" - best_model_id = first(results.info) - key = operator.itemgetter("model_id") - best_index = -1 - # history_results is sorted by (model_id, partial_fit_calls) - # best is the model_id with the highest partial fit calls - for k, v in itertools.groupby(history_results, key=key): - v = list(v) - best_index += len(v) - if k == best_model_id: - break - - return results.models[best_model_id], best_index + def _get_best(self, results, cv_results): + scores = { + k: v[-1]["score"] for k, v in results.info.items() if k in results.models + } + + # Could use max(scores, key=score.get), but what if score is repeated? + # Happens in the test case a lot + model_ids = list(scores.keys()) + scores = [scores[k] for k in model_ids] + model_idx = np.argmax(scores) + best_model_id = model_ids[model_idx] + + best_est = results.models[best_model_id] + + idx = cv_results["model_id"] == best_model_id + assert idx.sum() == 1 + best_idx = np.argmax(idx) + return best_idx, best_est def _process_results(self, results): """Called with the output of `fit` immediately after it finishes. @@ -554,6 +555,7 @@ def _fit(self, X, y, **fit_params): model_history, models, history = results cv_results = self._get_cv_results(history, model_history) + best_idx, best_est = self._get_best(results, cv_results) best_estimator = yield best_est # Clean up models we're hanging onto From f46ae41bb53447d702b93d83679aabc40888948d Mon Sep 17 00:00:00 2001 From: Scott Sievert Date: Sat, 13 Oct 2018 11:47:42 -0500 Subject: [PATCH 05/14] TST, MAINT: clean stopping on plateau (see notes below) * MAINT: cleaner separation with _adapt and _stop_on_plateau functions (separates complex adaptive algorithm and stopping on plateau, and allows for overwriting _adapt for other adaptive algorithms that want to stop on plateau) * TST: implement tests for patience and tolerance parameters * MAINT: define "patience" to be the number of partial_fit calls, not the number of score calls --- dask_ml/model_selection/_incremental.py | 47 +++++++++++------- tests/model_selection/test_incremental.py | 60 ++++++++++++++++++++--- 2 files changed, 83 insertions(+), 24 deletions(-) diff --git a/dask_ml/model_selection/_incremental.py b/dask_ml/model_selection/_incremental.py index 4965e9282..fc6918f51 100644 --- a/dask_ml/model_selection/_incremental.py +++ b/dask_ml/model_selection/_incremental.py @@ -783,7 +783,35 @@ def _get_params(self): else: return ParameterSampler(self.parameters, self.n_initial_parameters) + def _stop_on_plateau(self, info, instructions): + out = {} + for k, steps in instructions.items(): + records = info[k] + current_calls = records[-1]["partial_fit_calls"] + if self.max_iter and current_calls >= self.max_iter: + out[k] = 0 + elif self.patience and current_calls >= self.patience: + plateau = [ + h["score"] + for h in records + if current_calls - h["partial_fit_calls"] <= self.patience + ] + if all(score <= plateau[0] + self.tol for score in plateau[1:]): + out[k] = 0 + else: + out[k] = steps + + else: + out[k] = steps + return out + def _additional_calls(self, info): + instructions = self._adapt(info) + + out = self._stop_on_plateau(info, instructions) + return out + + def _adapt(self, info): if self.n_initial_parameters == "grid": start = len(ParameterGrid(self.parameters)) else: @@ -810,20 +838,5 @@ def inverse(time): if len(best) == 1: [best] = best return {best: 0} - - out = {} - for k in best: - records = info[k] - if self.max_iter and len(records) >= self.max_iter: - out[k] = 0 - elif self.patience and len(records) >= self.patience: - old = records[-self.patience]["score"] - if all(d["score"] < old + self.tol for d in records[-self.patience :]): - out[k] = 0 - else: - out[k] = next_time_step - current_time_step - - else: - out[k] = next_time_step - current_time_step - - return out + steps = next_time_step - current_time_step + return {b: steps for b in best} diff --git a/tests/model_selection/test_incremental.py b/tests/model_selection/test_incremental.py index 94ebbc5d8..26476bad3 100644 --- a/tests/model_selection/test_incremental.py +++ b/tests/model_selection/test_incremental.py @@ -4,6 +4,7 @@ import toolz from dask.distributed import Future from distributed.utils_test import cluster, gen_cluster, loop # noqa: F401 +from sklearn.base import BaseEstimator from sklearn.cluster import MiniBatchKMeans from sklearn.linear_model import SGDClassifier from sklearn.model_selection import ParameterGrid, ParameterSampler @@ -240,27 +241,31 @@ def test_search_basic(c, s, a, b): @gen_cluster(client=True, timeout=None) -def test_search_patience(c, s, a, b): +def test_search_plateau_patience(c, s, a, b): X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5)) class ConstantClassifier(SGDClassifier): - def score(*args, **kwargs): - return 0.5 - model = ConstantClassifier(tol=1e-3) + def __init__(self, value=0): + self.value = value + super(ConstantClassifier, self).__init__(tol=1e-3) - params = {"alpha": np.logspace(-2, 10, 100), "l1_ratio": np.linspace(0.01, 1, 200)} + def score(self, *args, **kwargs): + return self.value + + params = {"value": np.random.rand(10)} + model = ConstantClassifier() search = IncrementalSearchCV( model, params, n_initial_parameters=10, patience=5, tol=0, max_iter=10 ) yield search.fit(X, y, classes=[0, 1]) - assert d["partial_fit_calls"] <= 3 assert search.history_ for h in search.history_: + assert h["partial_fit_calls"] <= 5 assert isinstance(search.best_estimator_, SGDClassifier) - assert search.best_score_ > 0 + assert search.best_score_ == params["value"].max() == search.best_estimator_.value assert "visualize" not in search.__dict__ X_test, y_test = yield c.compute([X, y]) @@ -269,6 +274,47 @@ def score(*args, **kwargs): search.score(X_test, y_test) +@gen_cluster(client=True, timeout=None) +def test_search_plateau_tol(c, s, a, b): + + class LinearFunction(BaseEstimator): + + def __init__(self, intercept=0, slope=1, foo=0): + self._num_calls = 0 + self.intercept = intercept + self.slope = slope + super(LinearFunction, self).__init__() + + def fit(self, *args): + return self + + def partial_fit(self, *args, **kwargs): + self._num_calls += 1 + return self + + def score(self, *args, **kwargs): + return self.intercept + self.slope * self._num_calls + + model = LinearFunction(slope=1) + params = {"foo": np.linspace(0, 1)} + + # every 3 calls, score will increase by 3. tol=1: model did improved enough + search = IncrementalSearchCV( + model, params, patience=3, tol=1, max_iter=10, decay_rate=0 + ) + X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5)) + yield search.fit(X, y) + assert set(search.cv_results_["partial_fit_calls"]) == {10} + + # Every 3 calls, score increases by 3. tol=4: model didn't improve enough + search = IncrementalSearchCV( + model, params, patience=3, tol=4, decay_rate=0, max_iter=10 + ) + X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5)) + yield search.fit(X, y) + assert set(search.cv_results_["partial_fit_calls"]) == {3} + + @gen_cluster(client=True) def test_search_max_iter(c, s, a, b): X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5)) From 8c7153d929c5fc67355301f26e9be566944b9ec0 Mon Sep 17 00:00:00 2001 From: Scott Sievert Date: Sat, 13 Oct 2018 11:49:24 -0500 Subject: [PATCH 06/14] =?UTF-8?q?BUG:=20=E2=88=9E=20loop=20in=20Incrementa?= =?UTF-8?q?lSearchCV=20if=20decay=5Frate=3D0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dask_ml/model_selection/_incremental.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/dask_ml/model_selection/_incremental.py b/dask_ml/model_selection/_incremental.py index fc6918f51..d6a5eef9c 100644 --- a/dask_ml/model_selection/_incremental.py +++ b/dask_ml/model_selection/_incremental.py @@ -824,13 +824,14 @@ def inverse(time): example = toolz.first(info.values()) time_step = example[-1]["partial_fit_calls"] - current_time_step = time_step + 1 - next_time_step = current_time_step - while inverse(current_time_step) == inverse(next_time_step) and ( - not self.patience - or next_time_step - current_time_step < self.scores_per_fit - ): - next_time_step += 1 + current_time_step = time_step + next_time_step = current_time_step + 1 + if self.decay_rate: + while inverse(current_time_step) == inverse(next_time_step) and ( + not self.patience + or next_time_step - current_time_step < self.scores_per_fit + ): + next_time_step += 1 target = inverse(next_time_step) best = toolz.topk(target, info, key=lambda k: info[k][-1]["score"]) From b5ce47c1d1bb973a46a471aac14c30d4d997499b Mon Sep 17 00:00:00 2001 From: Scott Sievert Date: Sat, 13 Oct 2018 12:32:27 -0500 Subject: [PATCH 07/14] TST: perform basic search (decay_rate=0) in test_search_basic --- tests/model_selection/test_incremental.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/model_selection/test_incremental.py b/tests/model_selection/test_incremental.py index 26476bad3..065e8b423 100644 --- a/tests/model_selection/test_incremental.py +++ b/tests/model_selection/test_incremental.py @@ -200,7 +200,9 @@ def test_search_basic(c, s, a, b): params = {"alpha": np.logspace(-2, 2, 100), "l1_ratio": np.linspace(0.01, 1, 200)} - search = IncrementalSearchCV(model, params, n_initial_parameters=20, max_iter=10) + search = IncrementalSearchCV( + model, params, n_initial_parameters=20, max_iter=10, decay_rate=0 + ) yield search.fit(X, y, classes=[0, 1]) assert search.history_ From c73f79c55bcad769fdbb4d06101844f7dc539c87 Mon Sep 17 00:00:00 2001 From: Scott Sievert Date: Sat, 13 Oct 2018 13:24:26 -0500 Subject: [PATCH 08/14] MAINT: collapse _adapt and _stop_on_plateau into one function --- dask_ml/model_selection/_incremental.py | 53 +++++++++++-------------- 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/dask_ml/model_selection/_incremental.py b/dask_ml/model_selection/_incremental.py index d6a5eef9c..1c5c8c41e 100644 --- a/dask_ml/model_selection/_incremental.py +++ b/dask_ml/model_selection/_incremental.py @@ -783,35 +783,8 @@ def _get_params(self): else: return ParameterSampler(self.parameters, self.n_initial_parameters) - def _stop_on_plateau(self, info, instructions): - out = {} - for k, steps in instructions.items(): - records = info[k] - current_calls = records[-1]["partial_fit_calls"] - if self.max_iter and current_calls >= self.max_iter: - out[k] = 0 - elif self.patience and current_calls >= self.patience: - plateau = [ - h["score"] - for h in records - if current_calls - h["partial_fit_calls"] <= self.patience - ] - if all(score <= plateau[0] + self.tol for score in plateau[1:]): - out[k] = 0 - else: - out[k] = steps - - else: - out[k] = steps - return out - def _additional_calls(self, info): - instructions = self._adapt(info) - - out = self._stop_on_plateau(info, instructions) - return out - - def _adapt(self, info): + # First, have an adaptive algorithm if self.n_initial_parameters == "grid": start = len(ParameterGrid(self.parameters)) else: @@ -840,4 +813,26 @@ def inverse(time): [best] = best return {best: 0} steps = next_time_step - current_time_step - return {b: steps for b in best} + instructions = {b: steps for b in best} + + # Second, stop on plateau if any models have already converged + out = {} + for k, steps in instructions.items(): + records = info[k] + current_calls = records[-1]["partial_fit_calls"] + if self.max_iter and current_calls >= self.max_iter: + out[k] = 0 + elif self.patience and current_calls >= self.patience: + plateau = [ + h["score"] + for h in records + if current_calls - h["partial_fit_calls"] <= self.patience + ] + if all(score <= plateau[0] + self.tol for score in plateau[1:]): + out[k] = 0 + else: + out[k] = steps + + else: + out[k] = steps + return out From ead5f31bea2bb2110bb6b89578f1a675a99ce469 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Mon, 15 Oct 2018 05:58:13 -0500 Subject: [PATCH 09/14] Fix test formatting --- tests/model_selection/test_incremental.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/model_selection/test_incremental.py b/tests/model_selection/test_incremental.py index 94ebbc5d8..88b0644c9 100644 --- a/tests/model_selection/test_incremental.py +++ b/tests/model_selection/test_incremental.py @@ -256,12 +256,12 @@ def score(*args, **kwargs): ) yield search.fit(X, y, classes=[0, 1]) - assert d["partial_fit_calls"] <= 3 assert search.history_ for h in search.history_: - assert isinstance(search.best_estimator_, SGDClassifier) - assert search.best_score_ > 0 - assert "visualize" not in search.__dict__ + assert isinstance(search.best_estimator_, SGDClassifier) + assert search.best_score_ > 0 + assert "visualize" not in search.__dict__ + assert h["partial_fit_calls"] <= 3 X_test, y_test = yield c.compute([X, y]) From 7b43e1658b283829def5b297e7c3b951a6eff4a9 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Mon, 15 Oct 2018 14:32:04 -0500 Subject: [PATCH 10/14] Use scipy to rank --- dask_ml/model_selection/_incremental.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dask_ml/model_selection/_incremental.py b/dask_ml/model_selection/_incremental.py index 4965e9282..5a9842def 100644 --- a/dask_ml/model_selection/_incremental.py +++ b/dask_ml/model_selection/_incremental.py @@ -8,6 +8,7 @@ import dask import dask.array as da import numpy as np +import scipy.stats import toolz from dask.distributed import Future, default_client, futures_of, wait from distributed.utils import log_errors @@ -495,9 +496,9 @@ def _get_cv_results(self, history, model_hist): } ) cv_results = {k: np.array(v) for k, v in cv_results.items()} - - order = (-1 * cv_results["test_score"]).argsort() - cv_results["rank_test_score"] = order.argsort() + 1 + cv_results["rank_test_score"] = scipy.stats.rankdata( + -cv_results["test_score"], method="min" + ).astype(int) return cv_results def _get_best(self, results, cv_results): From 74d77392cd55c304c9eb9158bdb9afc84b01ae38 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Mon, 15 Oct 2018 14:39:41 -0500 Subject: [PATCH 11/14] Replace cv_results asserts with sanity checks --- tests/model_selection/test_incremental.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/model_selection/test_incremental.py b/tests/model_selection/test_incremental.py index 88b0644c9..7ffc0711f 100644 --- a/tests/model_selection/test_incremental.py +++ b/tests/model_selection/test_incremental.py @@ -223,12 +223,15 @@ def test_search_basic(c, s, a, b): "param_alpha", "param_l1_ratio", }.issubset(set(search.cv_results_.keys())) + assert all(isinstance(v, np.ndarray) for v in search.cv_results_.values()) - assert ( - search.cv_results_["test_score"][search.best_index_] - >= search.cv_results_["test_score"] - ).all() - assert search.cv_results_["rank_test_score"][search.best_index_] == 1 + assert all(search.cv_results_["test_score"] >= 0) + assert all(search.cv_results_["rank_test_score"] >= 1) + assert all(search.cv_results_["partial_fit_calls"] >= 1) + assert len(np.unique(search.cv_results_["model_id"])) == len( + search.cv_results_["model_id"] + ) + X_, = yield c.compute([X]) proba = search.predict_proba(X_) From c13ea6249391a257a86ebf913d5754d6d343310c Mon Sep 17 00:00:00 2001 From: Scott Sievert Date: Mon, 15 Oct 2018 20:08:38 -0500 Subject: [PATCH 12/14] TST: if passive, return highest scoring model else sanity checks --- tests/model_selection/test_incremental.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/tests/model_selection/test_incremental.py b/tests/model_selection/test_incremental.py index 7c39e0906..3700a403a 100644 --- a/tests/model_selection/test_incremental.py +++ b/tests/model_selection/test_incremental.py @@ -1,6 +1,7 @@ import random import numpy as np +import pytest import toolz from dask.distributed import Future from distributed.utils_test import cluster, gen_cluster, loop # noqa: F401 @@ -195,13 +196,18 @@ def additional_calls(scores): @gen_cluster(client=True) def test_search_basic(c, s, a, b): + for decay_rate in {0, 1}: + _test_search_basic(decay_rate, c, s, a, b) + + +def _test_search_basic(decay_rate, c, s, a, b): X, y = make_classification(n_samples=1000, n_features=5, chunks=(100, 5)) model = SGDClassifier(tol=1e-3, loss="log", penalty="elasticnet") params = {"alpha": np.logspace(-2, 2, 100), "l1_ratio": np.linspace(0.01, 1, 200)} search = IncrementalSearchCV( - model, params, n_initial_parameters=20, max_iter=10, decay_rate=0 + model, params, n_initial_parameters=20, max_iter=10, decay_rate=decay_rate ) yield search.fit(X, y, classes=[0, 1]) @@ -228,8 +234,15 @@ def test_search_basic(c, s, a, b): }.issubset(set(search.cv_results_.keys())) assert all(isinstance(v, np.ndarray) for v in search.cv_results_.values()) - assert all(search.cv_results_["test_score"] >= 0) - assert all(search.cv_results_["rank_test_score"] >= 1) + if decay_rate == 0: + assert ( + search.cv_results_["test_score"][search.best_index_] + >= search.cv_results_["test_score"] + ).all() + assert search.cv_results_["rank_test_score"][search.best_index_] == 1 + else: + assert all(search.cv_results_["test_score"] >= 0) + assert all(search.cv_results_["rank_test_score"] >= 1) assert all(search.cv_results_["partial_fit_calls"] >= 1) assert len(np.unique(search.cv_results_["model_id"])) == len( search.cv_results_["model_id"] @@ -250,7 +263,6 @@ def test_search_plateau_patience(c, s, a, b): X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5)) class ConstantClassifier(SGDClassifier): - def __init__(self, value=0): self.value = value super(ConstantClassifier, self).__init__(tol=1e-3) @@ -282,9 +294,7 @@ def score(self, *args, **kwargs): @gen_cluster(client=True, timeout=None) def test_search_plateau_tol(c, s, a, b): - class LinearFunction(BaseEstimator): - def __init__(self, intercept=0, slope=1, foo=0): self._num_calls = 0 self.intercept = intercept From 2cbf9bbcc03a076af9544828bb76dbc6c95c7a5f Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 16 Oct 2018 14:23:39 -0500 Subject: [PATCH 13/14] lint --- tests/model_selection/test_incremental.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/model_selection/test_incremental.py b/tests/model_selection/test_incremental.py index 6682af541..5d828b746 100644 --- a/tests/model_selection/test_incremental.py +++ b/tests/model_selection/test_incremental.py @@ -1,7 +1,6 @@ import random import numpy as np -import pytest import toolz from dask.distributed import Future from distributed.utils_test import cluster, gen_cluster, loop # noqa: F401 From c08527fd57a3f41180eccd8cbb50c4cf13879477 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Wed, 17 Oct 2018 08:28:28 -0500 Subject: [PATCH 14/14] run tests --- tests/model_selection/test_incremental.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/model_selection/test_incremental.py b/tests/model_selection/test_incremental.py index 37e8e96cd..c9600c92e 100644 --- a/tests/model_selection/test_incremental.py +++ b/tests/model_selection/test_incremental.py @@ -196,9 +196,10 @@ def additional_calls(scores): @gen_cluster(client=True) def test_search_basic(c, s, a, b): for decay_rate in {0, 1}: - _test_search_basic(decay_rate, c, s, a, b) + yield _test_search_basic(decay_rate, c, s, a, b) +@gen.coroutine def _test_search_basic(decay_rate, c, s, a, b): X, y = make_classification(n_samples=1000, n_features=5, chunks=(100, 5)) model = SGDClassifier(tol=1e-3, loss="log", penalty="elasticnet")