diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java index 1cd7d34ec6..e2817d4037 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java @@ -40,16 +40,17 @@ public void execute(ClusterEvent event) throws Exception { long deregisterDelay = clusterConfig.getParticipantDeregistrationTimeout(); long stageStartTime = System.currentTimeMillis(); Set participantsToDeregister = new HashSet<>(); - long earliestDeregisterTime = Long.MAX_VALUE; + long nextDeregisterTime = Long.MAX_VALUE; for (Map.Entry entry : offlineTimeMap.entrySet()) { String instanceName = entry.getKey(); Long offlineTime = entry.getValue(); long deregisterTime = offlineTime + deregisterDelay; + cache.getLiveInstances().get(instanceName); // Skip if instance is still online - if (offlineTime == ParticipantHistory.ONLINE) { + if (cache.getLiveInstances().containsKey(instanceName)) { continue; } @@ -58,9 +59,7 @@ public void execute(ClusterEvent event) throws Exception { participantsToDeregister.add(instanceName); } else { // Otherwise, find the next earliest deregister time - if (deregisterTime < earliestDeregisterTime) { - earliestDeregisterTime = deregisterTime; - } + nextDeregisterTime = Math.min(nextDeregisterTime, deregisterTime); } } @@ -73,8 +72,8 @@ public void execute(ClusterEvent event) throws Exception { } } // Schedule the next deregister task - if (earliestDeregisterTime != Long.MAX_VALUE) { - long delay = earliestDeregisterTime - stageStartTime; + if (nextDeregisterTime != Long.MAX_VALUE) { + long delay = nextDeregisterTime - System.currentTimeMillis(); scheduleOnDemandPipeline(manager.getClusterName(), delay); } } @@ -107,7 +106,7 @@ private Set deregisterParticipants(HelixManager manager, ResourceControl manager.getClusterManagmentTool().dropInstance(cache.getClusterName(), instanceConfig); successfullyDeregisteredInstances.add(instanceName); } catch (HelixException e) { - LOG.error("Failed to deregister instance {} from cluster {}", instanceName, cache.getClusterName(), e); + LOG.warn("Failed to deregister instance {} from cluster {}", instanceName, cache.getClusterName(), e); } } diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index fc8a5b34c5..56cde31430 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -166,7 +166,6 @@ public enum ClusterConfigProperty { // List of Preferred scoring keys used in evenness score computation PREFERRED_SCORING_KEYS, - PARTICIPANT_DEREGISTRATION_ENABLED, PARTICIPANT_DEREGISTRATION_TIMEOUT } @@ -1258,15 +1257,6 @@ public void setPreferredScoringKeys(List preferredScoringKeys) { preferredScoringKeys); } - public boolean isParticipantDeregistrationEnabled() { - return _record.getBooleanField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_ENABLED.name(), - false); - } - - public void setParticipantDeregistrationEnabled(boolean enabled) { - _record.setBooleanField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_ENABLED.name(), enabled); - } - public long getParticipantDeregistrationTimeout() { return _record.getLongField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_TIMEOUT.name(), -1); @@ -1275,4 +1265,8 @@ public long getParticipantDeregistrationTimeout() { public void setParticipantDeregistrationTimeout(long timeout) { _record.setLongField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_TIMEOUT.name(), timeout); } + + public boolean isParticipantDeregistrationEnabled() { + return getParticipantDeregistrationTimeout() > -1; + } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java index 5512b66712..7e9bfabca2 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java @@ -52,7 +52,7 @@ public void beforeClass() { _admin = _gSetupTool.getClusterManagementTool(); _dataAccessor = _controller.getHelixDataAccessor(); - setAutoDeregisterConfigs(CLUSTER_NAME, true, DEREGISTER_TIMEOUT); + setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT); } // Asserts that a node will be removed from the cluster after it exceedsthe deregister timeout set in the cluster config @@ -169,7 +169,7 @@ public void testDeregisterAfterConfigEnabled() throws Exception { // Set to deregister to disabled long testDeregisterTimeout = 1000; - setAutoDeregisterConfigs(CLUSTER_NAME, false, testDeregisterTimeout); + setAutoDeregisterConfigs(CLUSTER_NAME, -1); // Create and immediately kill participants List killedParticipants = new ArrayList<>(); @@ -182,7 +182,7 @@ public void testDeregisterAfterConfigEnabled() throws Exception { // Sleep so that participant offline time exceeds deregister timeout Thread.sleep(testDeregisterTimeout); // Trigger on disable --> enable deregister - setAutoDeregisterConfigs(CLUSTER_NAME, true, testDeregisterTimeout); + setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT); // Assert participants have been deregistered boolean result = TestHelper.verify(() -> { @@ -208,7 +208,7 @@ public void testDeregisterAfterConfigTimeoutShortened() throws Exception { + new Date(System.currentTimeMillis())); long longDeregisterTimeout = 1000*60*60*24; long shortDeregisterTimeout = 1000; - setAutoDeregisterConfigs(CLUSTER_NAME, true, longDeregisterTimeout); + setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT); // Create and immediately kill participants List killedParticipants = new ArrayList<>(); @@ -222,7 +222,7 @@ public void testDeregisterAfterConfigTimeoutShortened() throws Exception { Thread.sleep(shortDeregisterTimeout); // Trigger on shorten deregister timeout - setAutoDeregisterConfigs(CLUSTER_NAME, true, shortDeregisterTimeout); + setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT); // Assert participants have been deregistered boolean result = TestHelper.verify(() -> { @@ -253,9 +253,8 @@ public MockParticipantManager addParticipant(String clusterName, String instance return toAddParticipant; } - private void setAutoDeregisterConfigs(String clusterName, boolean enabled, long timeout) { + private void setAutoDeregisterConfigs(String clusterName, long timeout) { ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName); - clusterConfig.setParticipantDeregistrationEnabled(enabled); clusterConfig.setParticipantDeregistrationTimeout(timeout); _configAccessor.setClusterConfig(clusterName, clusterConfig); // Allow participant to ensure compatibility with nodes re-joining when they re-establish connection