Skip to content

Commit

Permalink
respond reviewer feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
GrantPSpencer committed Nov 21, 2024
1 parent 8d2be71 commit 45190e9
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void execute(ClusterEvent event) throws Exception {
long deregisterDelay = clusterConfig.getParticipantDeregistrationTimeout();
long stageStartTime = System.currentTimeMillis();
Set<String> participantsToDeregister = new HashSet<>();
long earliestDeregisterTime = Long.MAX_VALUE;
long nextDeregisterTime = Long.MAX_VALUE;


for (Map.Entry<String, Long> entry : offlineTimeMap.entrySet()) {
Expand All @@ -49,7 +49,7 @@ public void execute(ClusterEvent event) throws Exception {
long deregisterTime = offlineTime + deregisterDelay;

// Skip if instance is still online
if (offlineTime == ParticipantHistory.ONLINE) {
if (cache.getLiveInstances().containsKey(instanceName)) {
continue;
}

Expand All @@ -58,9 +58,7 @@ public void execute(ClusterEvent event) throws Exception {
participantsToDeregister.add(instanceName);
} else {
// Otherwise, find the next earliest deregister time
if (deregisterTime < earliestDeregisterTime) {
earliestDeregisterTime = deregisterTime;
}
nextDeregisterTime = Math.min(nextDeregisterTime, deregisterTime);
}
}

Expand All @@ -73,8 +71,8 @@ public void execute(ClusterEvent event) throws Exception {
}
}
// Schedule the next deregister task
if (earliestDeregisterTime != Long.MAX_VALUE) {
long delay = earliestDeregisterTime - stageStartTime;
if (nextDeregisterTime != Long.MAX_VALUE) {
long delay = Math.max(nextDeregisterTime - System.currentTimeMillis(), 0);
scheduleOnDemandPipeline(manager.getClusterName(), delay);
}
}
Expand Down Expand Up @@ -107,7 +105,7 @@ private Set<String> deregisterParticipants(HelixManager manager, ResourceControl
manager.getClusterManagmentTool().dropInstance(cache.getClusterName(), instanceConfig);
successfullyDeregisteredInstances.add(instanceName);
} catch (HelixException e) {
LOG.error("Failed to deregister instance {} from cluster {}", instanceName, cache.getClusterName(), e);
LOG.warn("Failed to deregister instance {} from cluster {}", instanceName, cache.getClusterName(), e);
}
}

Expand Down
14 changes: 4 additions & 10 deletions helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ public enum ClusterConfigProperty {

// List of Preferred scoring keys used in evenness score computation
PREFERRED_SCORING_KEYS,
PARTICIPANT_DEREGISTRATION_ENABLED,
PARTICIPANT_DEREGISTRATION_TIMEOUT
}

Expand Down Expand Up @@ -1258,15 +1257,6 @@ public void setPreferredScoringKeys(List<String> preferredScoringKeys) {
preferredScoringKeys);
}

public boolean isParticipantDeregistrationEnabled() {
return _record.getBooleanField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_ENABLED.name(),
false);
}

public void setParticipantDeregistrationEnabled(boolean enabled) {
_record.setBooleanField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_ENABLED.name(), enabled);
}

public long getParticipantDeregistrationTimeout() {
return _record.getLongField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_TIMEOUT.name(),
-1);
Expand All @@ -1275,4 +1265,8 @@ public long getParticipantDeregistrationTimeout() {
public void setParticipantDeregistrationTimeout(long timeout) {
_record.setLongField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_TIMEOUT.name(), timeout);
}

public boolean isParticipantDeregistrationEnabled() {
return getParticipantDeregistrationTimeout() > -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void beforeClass() {
_admin = _gSetupTool.getClusterManagementTool();
_dataAccessor = _controller.getHelixDataAccessor();

setAutoDeregisterConfigs(CLUSTER_NAME, true, DEREGISTER_TIMEOUT);
setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);
}

// Asserts that a node will be removed from the cluster after it exceedsthe deregister timeout set in the cluster config
Expand Down Expand Up @@ -169,7 +169,7 @@ public void testDeregisterAfterConfigEnabled() throws Exception {

// Set to deregister to disabled
long testDeregisterTimeout = 1000;
setAutoDeregisterConfigs(CLUSTER_NAME, false, testDeregisterTimeout);
setAutoDeregisterConfigs(CLUSTER_NAME, -1);

// Create and immediately kill participants
List<MockParticipantManager> killedParticipants = new ArrayList<>();
Expand All @@ -182,7 +182,7 @@ public void testDeregisterAfterConfigEnabled() throws Exception {
// Sleep so that participant offline time exceeds deregister timeout
Thread.sleep(testDeregisterTimeout);
// Trigger on disable --> enable deregister
setAutoDeregisterConfigs(CLUSTER_NAME, true, testDeregisterTimeout);
setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);

// Assert participants have been deregistered
boolean result = TestHelper.verify(() -> {
Expand All @@ -208,7 +208,7 @@ public void testDeregisterAfterConfigTimeoutShortened() throws Exception {
+ new Date(System.currentTimeMillis()));
long longDeregisterTimeout = 1000*60*60*24;
long shortDeregisterTimeout = 1000;
setAutoDeregisterConfigs(CLUSTER_NAME, true, longDeregisterTimeout);
setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);

// Create and immediately kill participants
List<MockParticipantManager> killedParticipants = new ArrayList<>();
Expand All @@ -222,7 +222,7 @@ public void testDeregisterAfterConfigTimeoutShortened() throws Exception {
Thread.sleep(shortDeregisterTimeout);

// Trigger on shorten deregister timeout
setAutoDeregisterConfigs(CLUSTER_NAME, true, shortDeregisterTimeout);
setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);

// Assert participants have been deregistered
boolean result = TestHelper.verify(() -> {
Expand Down Expand Up @@ -253,9 +253,8 @@ public MockParticipantManager addParticipant(String clusterName, String instance
return toAddParticipant;
}

private void setAutoDeregisterConfigs(String clusterName, boolean enabled, long timeout) {
private void setAutoDeregisterConfigs(String clusterName, long timeout) {
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
clusterConfig.setParticipantDeregistrationEnabled(enabled);
clusterConfig.setParticipantDeregistrationTimeout(timeout);
_configAccessor.setClusterConfig(clusterName, clusterConfig);
// Allow participant to ensure compatibility with nodes re-joining when they re-establish connection
Expand Down

0 comments on commit 45190e9

Please sign in to comment.