diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/failuredetector/BaseExponentialBackoffRetryFailureDetector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/failuredetector/BaseExponentialBackoffRetryFailureDetector.java index 78839ad1b241..b3c7639cfd97 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/failuredetector/BaseExponentialBackoffRetryFailureDetector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/failuredetector/BaseExponentialBackoffRetryFailureDetector.java @@ -178,13 +178,13 @@ protected class RetryInfo implements Delayed { @Override public long getDelay(TimeUnit unit) { - return unit.convert(System.nanoTime() - _retryTimeNs, TimeUnit.NANOSECONDS); + return unit.convert(_retryTimeNs - System.nanoTime(), TimeUnit.NANOSECONDS); } @Override public int compareTo(Delayed o) { RetryInfo that = (RetryInfo) o; - return Long.compare(that._retryTimeNs, _retryTimeNs); + return Long.compare(_retryTimeNs, that._retryTimeNs); } } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/failuredetector/FailureDetectorFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/failuredetector/FailureDetectorFactory.java index 718c064d08da..712be7bb5a64 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/failuredetector/FailureDetectorFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/failuredetector/FailureDetectorFactory.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.broker.failuredetector; +import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.spi.env.PinotConfiguration; @@ -32,25 +33,41 @@ private FailureDetectorFactory() { } private static final Logger LOGGER = LoggerFactory.getLogger(FailureDetectorFactory.class); + private static final FailureDetector NO_OP_FAILURE_DETECTOR = new NoOpFailureDetector(); public static FailureDetector getFailureDetector(PinotConfiguration config, BrokerMetrics brokerMetrics) { - String className = config.getProperty(Broker.FailureDetector.CONFIG_OF_CLASS_NAME); - if (StringUtils.isEmpty(className)) { - LOGGER.info("Class name is not configured, falling back to NoOpFailureDetector"); - return new NoOpFailureDetector(); - } else { - LOGGER.info("Initializing failure detector with class: {}", className); - try { - FailureDetector failureDetector = PluginManager.get().createInstance(className); + String typeStr = config.getProperty(Broker.FailureDetector.CONFIG_OF_TYPE, Broker.FailureDetector.DEFAULT_TYPE); + Broker.FailureDetector.Type type; + try { + type = Broker.FailureDetector.Type.valueOf(typeStr.toUpperCase()); + } catch (Exception e) { + throw new IllegalArgumentException("Illegal failure detector type: " + typeStr); + } + switch (type) { + case NO_OP: + return NO_OP_FAILURE_DETECTOR; + case CONNECTION: { + FailureDetector failureDetector = new ConnectionFailureDetector(); failureDetector.init(config, brokerMetrics); - LOGGER.info("Initialized failure detector with class: {}", className); return failureDetector; - } catch (Exception e) { - LOGGER.error( - "Caught exception while initializing failure detector with class: {}, falling back to NoOpFailureDetector", - className); - return new NoOpFailureDetector(); } + case CUSTOM: { + String className = config.getProperty(Broker.FailureDetector.CONFIG_OF_CLASS_NAME); + Preconditions.checkArgument(!StringUtils.isEmpty(className), + "Failure detector class name must be configured for CUSTOM type"); + LOGGER.info("Initializing CUSTOM failure detector with class: {}", className); + try { + FailureDetector failureDetector = PluginManager.get().createInstance(className); + failureDetector.init(config, brokerMetrics); + LOGGER.info("Initialized CUSTOM failure detector with class: {}", className); + return failureDetector; + } catch (Exception e) { + throw new RuntimeException( + "Caught exception while initializing CUSTOM failure detector with class: " + className, e); + } + } + default: + throw new IllegalStateException("Unsupported failure detector type: " + type); } } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/failuredetector/ConnectionFailureDetectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/failuredetector/ConnectionFailureDetectorTest.java index 38e92f94d472..ad4c88d7c957 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/failuredetector/ConnectionFailureDetectorTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/failuredetector/ConnectionFailureDetectorTest.java @@ -50,7 +50,7 @@ public class ConnectionFailureDetectorTest { @BeforeClass public void setUp() { PinotConfiguration config = new PinotConfiguration(); - config.setProperty(Broker.FailureDetector.CONFIG_OF_CLASS_NAME, ConnectionFailureDetector.class.getName()); + config.setProperty(Broker.FailureDetector.CONFIG_OF_TYPE, Broker.FailureDetector.Type.CONNECTION.name()); config.setProperty(Broker.FailureDetector.CONFIG_OF_RETRY_INITIAL_DELAY_MS, 100); config.setProperty(Broker.FailureDetector.CONFIG_OF_RETRY_DELAY_FACTOR, 1); _brokerMetrics = new BrokerMetrics(PinotMetricUtils.getPinotMetricsRegistry()); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java index 94169a18ba9b..d64c7321618c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java @@ -24,7 +24,6 @@ import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.pinot.broker.broker.helix.BaseBrokerStarter; -import org.apache.pinot.broker.failuredetector.ConnectionFailureDetector; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.server.starter.helix.BaseServerStarter; import org.apache.pinot.spi.env.PinotConfiguration; @@ -64,7 +63,7 @@ protected int getNumReplicas() { @Override protected void overrideBrokerConf(PinotConfiguration brokerConf) { - brokerConf.setProperty(FailureDetector.CONFIG_OF_CLASS_NAME, ConnectionFailureDetector.class.getName()); + brokerConf.setProperty(FailureDetector.CONFIG_OF_TYPE, FailureDetector.Type.CONNECTION.name()); } @Test diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index e45d32342690..eb1980d3796d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -251,6 +251,19 @@ public static class QueryOptionKey { } public static class FailureDetector { + public enum Type { + // Do not detect any failure + NO_OP, + + // Detect connection failures + CONNECTION, + + // Use the custom failure detector of the configured class name + CUSTOM + } + + public static final String CONFIG_OF_TYPE = "pinot.broker.failure.detector.type"; + public static final String DEFAULT_TYPE = Type.NO_OP.name(); public static final String CONFIG_OF_CLASS_NAME = "pinot.broker.failure.detector.class"; // Exponential backoff delay of retrying an unhealthy server when a failure is detected @@ -510,11 +523,9 @@ public static class Segment { public static class Realtime { public enum Status { // Means the segment is in CONSUMING state. - IN_PROGRESS, - // Means the segment is in ONLINE state (segment completed consuming and has been saved in + IN_PROGRESS, // Means the segment is in ONLINE state (segment completed consuming and has been saved in // segment store). - DONE, - // Means the segment is uploaded to a Pinot controller by an external party. + DONE, // Means the segment is uploaded to a Pinot controller by an external party. UPLOADED } @@ -525,8 +536,7 @@ public enum Status { public enum CompletionMode { // default behavior - if the in memory segment in the non-winner server is equivalent to the committed // segment, then build and replace, else download - DEFAULT, - // non-winner servers always download the segment, never build it + DEFAULT, // non-winner servers always download the segment, never build it DOWNLOAD }