Skip to content

Commit

Permalink
fix: Fix unintentional forwarding to failing instances
Browse files Browse the repository at this point in the history
Motivation

A bug was discovered where inference requests are incorrectly forwarded to failing instances in the case that (a) there are instances in the failing list (b) the instance doing the routing is the one that _should_ process the request.

This was due to how the custom litelinks loadbalancers used by model-mesh return null to indicate that the RPC should be aborted and instead handled by the local instance. In these cases litelinks will first attempt sending to failing instances if there are any.

Modifications

Take advantage of new ABORT_REQUEST functionality added to litelinks 1.7.2. The load balancer logic now returns ABORT_REQUEST rather than null when the local instance has been chosen to handle the request.

Also make some local variables final within the LB implementations to aid with readability.

Result

Less disruptive behaviour in some failure modes.

Signed-off-by: Nick Hill <[email protected]>
  • Loading branch information
njhill authored and pvaneck committed Feb 11, 2022
1 parent 9a143ee commit 99438b4
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions src/main/java/com/ibm/watson/modelmesh/ModelMesh.java
Original file line number Diff line number Diff line change
Expand Up @@ -4218,7 +4218,7 @@ public <T> T getNext(Object[] sis, String method, Object[] args) {
final boolean excludeSelf = filtered.excludeSelf, preferSelf = filtered.preferSelf;
boolean seenSelf = false;
Map<String, ServiceInstanceInfo> siMap = getMap(sis);
long now = currentTimeMillis();
final long now = currentTimeMillis();
ServiceInstanceInfo chosen = null;
String chosenId = null;
long chosenTimeStamp = 0L;
Expand All @@ -4237,7 +4237,7 @@ public <T> T getNext(Object[] sis, String method, Object[] args) {
}
us = true;
}
ServiceInstanceInfo sii = siMap.get(iid);
final ServiceInstanceInfo sii = siMap.get(iid);
if (sii == null) {
// filtered.add(ent); // don't filter not-found ones
continue;
Expand All @@ -4248,12 +4248,12 @@ public <T> T getNext(Object[] sis, String method, Object[] args) {
}
if (loadStarted < stillLoadingCutoffTime) {
// balanced selection logic
ServiceInstance<?> si = (ServiceInstance<?>) sii;
int inuse = us ? localInvokesInFlight.get() : si.getInUseCount();
final ServiceInstance<?> si = (ServiceInstance<?>) sii;
final int inuse = us ? localInvokesInFlight.get() : si.getInUseCount();
if (inuse > min) {
continue;
}
long nlu = us ? (preferSelf ? 0L : lastInvokeTime) : si.getLastUsedTime();
final long nlu = us ? (preferSelf ? 0L : lastInvokeTime) : si.getLastUsedTime();
if (inuse < min) {
min = inuse;
} else if (nlu >= lru) {
Expand All @@ -4275,9 +4275,9 @@ else if (min == Integer.MAX_VALUE && loadStarted < firstStarted) {
}
if (chosen != null) {
// if unbalanced then we might be choosing ourself here,
// return null (=> ServiceUnavailableException) to indicate this
// return ABORT_REQUEST (=> ServiceUnavailableException) to indicate this
if (!excludeSelf && instanceId.equals(chosenId)) {
return null;
return (T) LoadBalancer.ABORT_REQUEST;
}
if (sendDestinationId) {
ensureContextMapIsMutable(ThreadContext.getCurrentContext()).put(DEST_INST_ID_KEY, chosenId);
Expand Down Expand Up @@ -4675,7 +4675,7 @@ public <T> T getNext(Object[] sis, String method, Object[] args) {
// Those "seen" form a shortlist for the subsequent random choice.

// Filter the iterator based on known hard constraints/exclusions
Set<String> constrainTo = typeConstraints != null ?
final Set<String> constrainTo = typeConstraints != null ?
typeConstraints.getCandidateInstances(exclude.modelType) : null;

final ObjectLongMap<String> excludeReplicaSets = upgradeTracker.getLikelyReplacedReplicaSets();
Expand Down Expand Up @@ -4779,14 +4779,14 @@ public <T> T getNext(Object[] sis, String method, Object[] args) {
if (simpleCase) {
if (us && favourSelf) {
// If we are a candidate ourselves and the favourSelf flag is set,
// choose ourselves immediately (returning null indicates this)
return null;
// choose ourselves immediately (returning ABORT_REQUEST indicates this)
return (T) LoadBalancer.ABORT_REQUEST;
}
candidates.add(bestIid);
instReqLoad.add(bestInst.getReqsPerMinute());

// Simple case, don't need to weigh preferred
long oldest = bestInst.getLruTime();
final long oldest = bestInst.getLruTime();
while (it.hasNext()) {
Entry<String, InstanceRecord> ent = it.next();
String iid = ent.getKey();
Expand Down Expand Up @@ -4818,15 +4818,15 @@ public <T> T getNext(Object[] sis, String method, Object[] args) {
}

if (us && favourSelf) {
return null; // this is to indicate that we should be chosen
return (T) LoadBalancer.ABORT_REQUEST; // this is to indicate that we should be chosen
}

candidates.add(iid);
instReqLoad.add(curInst.getReqsPerMinute());
}
}

int ccount = candidates.size();
final int ccount = candidates.size();
if (ccount == 0) {
return null;
}
Expand All @@ -4837,7 +4837,7 @@ public <T> T getNext(Object[] sis, String method, Object[] args) {

// note this might be negative in the case of load-triggered
// scale-up (see scale-up logic in rateTrackingTask())
long lastUsedAgo = age(exclude.lastUsedTime);
final long lastUsedAgo = age(exclude.lastUsedTime);
String chosenInstId = null;
if (ccount == 1) {
chosenInstId = candidates.get(0);
Expand Down Expand Up @@ -4876,7 +4876,7 @@ public <T> T getNext(Object[] sis, String method, Object[] args) {
}
// candidates.clear(); //TODO only if reused
if (!favourSelf && instanceId.equals(chosenInstId)) {
return null; // indicates we are chosen
return (T) LoadBalancer.ABORT_REQUEST; // indicates we are chosen
}
boolean exclusions = !exclude.isEmpty();
if (exclusions || sendDestinationId) {
Expand Down

0 comments on commit 99438b4

Please sign in to comment.