Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MAINT, BUG, TST: incremental API cleaning #406

Merged
merged 17 commits into from
Oct 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions dask_ml/model_selection/_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,7 @@ def _get_params(self):
return ParameterSampler(self.parameters, self.n_initial_parameters)

def _additional_calls(self, info):
# First, have an adaptive algorithm
if self.n_initial_parameters == "grid":
start = len(ParameterGrid(self.parameters))
else:
Expand Down Expand Up @@ -883,20 +884,27 @@ def inverse(time):
if len(best) == 1:
[best] = best
return {best: 0}
steps = next_time_step - current_time_step
instructions = {b: steps for b in best}

# Second, stop on plateau if any models have already converged
out = {}
for k in best:
for k, steps in instructions.items():
records = info[k]
if self.max_iter and len(records) >= self.max_iter:
current_calls = records[-1]["partial_fit_calls"]
if self.max_iter and current_calls >= self.max_iter:
out[k] = 0
elif self.patience and len(records) >= self.patience:
old = records[-self.patience]["score"]
if all(d["score"] < old + self.tol for d in records[-self.patience :]):
elif self.patience and current_calls >= self.patience:
plateau = [
h["score"]
for h in records
if current_calls - h["partial_fit_calls"] <= self.patience
]
if all(score <= plateau[0] + self.tol for score in plateau[1:]):
out[k] = 0
else:
out[k] = next_time_step - current_time_step
out[k] = steps

else:
out[k] = next_time_step - current_time_step

out[k] = steps
return out
83 changes: 71 additions & 12 deletions tests/model_selection/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import toolz
from dask.distributed import Future
from distributed.utils_test import cluster, gen_cluster, loop # noqa: F401
from sklearn.base import BaseEstimator
from sklearn.cluster import MiniBatchKMeans
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import ParameterGrid, ParameterSampler
Expand Down Expand Up @@ -194,12 +195,20 @@ def additional_calls(scores):

@gen_cluster(client=True)
def test_search_basic(c, s, a, b):
for decay_rate in {0, 1}:
yield _test_search_basic(decay_rate, c, s, a, b)


@gen.coroutine
def _test_search_basic(decay_rate, c, s, a, b):
X, y = make_classification(n_samples=1000, n_features=5, chunks=(100, 5))
model = SGDClassifier(tol=1e-3, loss="log", penalty="elasticnet")

params = {"alpha": np.logspace(-2, 2, 100), "l1_ratio": np.linspace(0.01, 1, 200)}

search = IncrementalSearchCV(model, params, n_initial_parameters=20, max_iter=10)
search = IncrementalSearchCV(
model, params, n_initial_parameters=20, max_iter=10, decay_rate=decay_rate
)
yield search.fit(X, y, classes=[0, 1])

assert search.history_
Expand All @@ -225,8 +234,15 @@ def test_search_basic(c, s, a, b):
}.issubset(set(search.cv_results_.keys()))

assert all(isinstance(v, np.ndarray) for v in search.cv_results_.values())
assert all(search.cv_results_["test_score"] >= 0)
assert all(search.cv_results_["rank_test_score"] >= 1)
if decay_rate == 0:
assert (
search.cv_results_["test_score"][search.best_index_]
>= search.cv_results_["test_score"]
).all()
assert search.cv_results_["rank_test_score"][search.best_index_] == 1
else:
assert all(search.cv_results_["test_score"] >= 0)
assert all(search.cv_results_["rank_test_score"] >= 1)
assert all(search.cv_results_["partial_fit_calls"] >= 1)
assert len(np.unique(search.cv_results_["model_id"])) == len(
search.cv_results_["model_id"]
Expand All @@ -252,16 +268,19 @@ def test_search_basic(c, s, a, b):


@gen_cluster(client=True, timeout=None)
def test_search_patience(c, s, a, b):
def test_search_plateau_patience(c, s, a, b):
X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5))

class ConstantClassifier(SGDClassifier):
def score(*args, **kwargs):
return 0.5
def __init__(self, value=0):
self.value = value
super(ConstantClassifier, self).__init__(tol=1e-3)

model = ConstantClassifier(tol=1e-3)
def score(self, *args, **kwargs):
return self.value

params = {"alpha": np.logspace(-2, 10, 100), "l1_ratio": np.linspace(0.01, 1, 200)}
params = {"value": np.random.rand(10)}
model = ConstantClassifier()

search = IncrementalSearchCV(
model, params, n_initial_parameters=10, patience=5, tol=0, max_iter=10
Expand All @@ -270,17 +289,57 @@ def score(*args, **kwargs):

assert search.history_
for h in search.history_:
assert isinstance(search.best_estimator_, SGDClassifier)
assert search.best_score_ > 0
assert "visualize" not in search.__dict__
assert h["partial_fit_calls"] <= 3
assert h["partial_fit_calls"] <= 5
assert isinstance(search.best_estimator_, SGDClassifier)
assert search.best_score_ == params["value"].max() == search.best_estimator_.value
assert "visualize" not in search.__dict__
assert search.best_score_ > 0

X_test, y_test = yield c.compute([X, y])

search.predict(X_test)
search.score(X_test, y_test)


@gen_cluster(client=True, timeout=None)
def test_search_plateau_tol(c, s, a, b):
class LinearFunction(BaseEstimator):
def __init__(self, intercept=0, slope=1, foo=0):
self._num_calls = 0
self.intercept = intercept
self.slope = slope
super(LinearFunction, self).__init__()

def fit(self, *args):
return self

def partial_fit(self, *args, **kwargs):
self._num_calls += 1
return self

def score(self, *args, **kwargs):
return self.intercept + self.slope * self._num_calls

model = LinearFunction(slope=1)
params = {"foo": np.linspace(0, 1)}

# every 3 calls, score will increase by 3. tol=1: model did improved enough
search = IncrementalSearchCV(
model, params, patience=3, tol=1, max_iter=10, decay_rate=0
)
X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5))
yield search.fit(X, y)
assert set(search.cv_results_["partial_fit_calls"]) == {10}

# Every 3 calls, score increases by 3. tol=4: model didn't improve enough
search = IncrementalSearchCV(
model, params, patience=3, tol=4, decay_rate=0, max_iter=10
)
X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5))
yield search.fit(X, y)
assert set(search.cv_results_["partial_fit_calls"]) == {3}


@gen_cluster(client=True)
def test_search_max_iter(c, s, a, b):
X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5))
Expand Down