Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add version-consistent result rounding to load_balance_peers #230

Merged
merged 9 commits into from
Apr 19, 2021
20 changes: 12 additions & 8 deletions hivemind/client/averaging/load_balancing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

logger = get_logger(__name__)

LOAD_BALANCING_LP_DECIMALS = 9


def load_balance_peers(vector_size, throughputs: Sequence[Optional[float]], min_size: int = 0) -> Tuple[int, ...]:
"""
Expand All @@ -29,7 +31,7 @@ def load_balance_peers(vector_size, throughputs: Sequence[Optional[float]], min_
return tuple(hagenbach_bishoff(vector_size, scores))


def optimize_parts_lp(vector_size: int, throughputs: np.ndarray, min_size: int = 0, eps: float = 1e-15) -> np.ndarray:
def optimize_parts_lp(vector_size: int, throughputs: np.ndarray, min_size: int = 0) -> np.ndarray:
"""
This method solves an optimization problem to minimize the total allreduce time.
In butterfly all-reduce, each peer acts both as a "client" and as an "aggregator":
Expand All @@ -47,35 +49,37 @@ def optimize_parts_lp(vector_size: int, throughputs: np.ndarray, min_size: int =
:returns: a vector of "scores", i-th score is proportional to the fraction of weights assigned to i-th peer
"""
assert np.all(throughputs >= 0) and np.any(throughputs > 0)
throughputs = np.asarray(throughputs, dtype=np.float64)
permutation = np.argsort(-throughputs)
throughputs = throughputs[permutation]
is_nonzero = throughputs != 0

group_size = len(throughputs)
num_variables = group_size + 1 # [w_1, ..., w_N, xi]

c = np.zeros(num_variables)
c = np.zeros(num_variables, dtype=np.float64)
c[-1] = 1.0 # optimize w.r.t. xi

# the constraints below are tuples (A, b) such that Ax <= b
nonnegative_weights = -np.eye(group_size, M=num_variables), np.zeros(group_size)
nonnegative_weights = -np.eye(group_size, num_variables, dtype=c.dtype), np.zeros(group_size, c.dtype)
weights_sum_to_one = c[None, :] - 1.0, np.array([-1.0])
coeff_per_variable = (group_size - 2.0) / np.maximum(throughputs, eps)
coeff_matrix_minus_xi = np.hstack([np.diag(coeff_per_variable), -np.ones((group_size, 1))])
coeff_per_variable = (group_size - 2.0) / np.maximum(throughputs, 10 ** -LOAD_BALANCING_LP_DECIMALS)
coeff_matrix_minus_xi = np.hstack([np.diag(coeff_per_variable), -np.ones((group_size, 1), c.dtype)])
xi_is_maximum = coeff_matrix_minus_xi[is_nonzero], -1.0 / throughputs[is_nonzero]
force_max_weights = np.eye(group_size, M=num_variables), is_nonzero.astype(c.dtype)
force_max_weights = np.eye(group_size, M=num_variables, dtype=c.dtype), is_nonzero.astype(c.dtype)

A, b = list(map(np.concatenate, zip(nonnegative_weights, weights_sum_to_one, xi_is_maximum, force_max_weights)))

solution = scipy.optimize.linprog(c, A_ub=A, b_ub=b)
solution = scipy.optimize.linprog(c, A_ub=A, b_ub=b, method='interior-point')
if solution.success:
peer_scores = solution.x[:group_size]
# if some peers have less than min_size elements, transfer their share to other peers (if any)
if np.max(peer_scores) >= min_size / float(vector_size):
peer_scores[peer_scores < min_size / float(vector_size)] = 0.0
peer_scores = np.round(peer_scores, LOAD_BALANCING_LP_DECIMALS)
else:
logger.error(f"Failed to solve load-balancing for bandwidths {throughputs}.")
peer_scores = np.ones(group_size)
peer_scores = np.ones(group_size, c.dtype)

return peer_scores[np.argsort(permutation)]

Expand Down