Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Apr 14, 2022
1 parent 8079026 commit 86d6469
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,13 @@ protected class RetryInfo implements Delayed {

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(System.nanoTime() - _retryTimeNs, TimeUnit.NANOSECONDS);
return unit.convert(_retryTimeNs - System.nanoTime(), TimeUnit.NANOSECONDS);
}

@Override
public int compareTo(Delayed o) {
RetryInfo that = (RetryInfo) o;
return Long.compare(that._retryTimeNs, _retryTimeNs);
return Long.compare(_retryTimeNs, that._retryTimeNs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.broker.failuredetector;

import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand All @@ -32,25 +33,41 @@ private FailureDetectorFactory() {
}

private static final Logger LOGGER = LoggerFactory.getLogger(FailureDetectorFactory.class);
private static final FailureDetector NO_OP_FAILURE_DETECTOR = new NoOpFailureDetector();

public static FailureDetector getFailureDetector(PinotConfiguration config, BrokerMetrics brokerMetrics) {
String className = config.getProperty(Broker.FailureDetector.CONFIG_OF_CLASS_NAME);
if (StringUtils.isEmpty(className)) {
LOGGER.info("Class name is not configured, falling back to NoOpFailureDetector");
return new NoOpFailureDetector();
} else {
LOGGER.info("Initializing failure detector with class: {}", className);
try {
FailureDetector failureDetector = PluginManager.get().createInstance(className);
String typeStr = config.getProperty(Broker.FailureDetector.CONFIG_OF_TYPE, Broker.FailureDetector.DEFAULT_TYPE);
Broker.FailureDetector.Type type;
try {
type = Broker.FailureDetector.Type.valueOf(typeStr.toUpperCase());
} catch (Exception e) {
throw new IllegalArgumentException("Illegal failure detector type: " + typeStr);
}
switch (type) {
case NO_OP:
return NO_OP_FAILURE_DETECTOR;
case CONNECTION: {
FailureDetector failureDetector = new ConnectionFailureDetector();
failureDetector.init(config, brokerMetrics);
LOGGER.info("Initialized failure detector with class: {}", className);
return failureDetector;
} catch (Exception e) {
LOGGER.error(
"Caught exception while initializing failure detector with class: {}, falling back to NoOpFailureDetector",
className);
return new NoOpFailureDetector();
}
case CUSTOM: {
String className = config.getProperty(Broker.FailureDetector.CONFIG_OF_CLASS_NAME);
Preconditions.checkArgument(!StringUtils.isEmpty(className),
"Failure detector class name must be configured for CUSTOM type");
LOGGER.info("Initializing CUSTOM failure detector with class: {}", className);
try {
FailureDetector failureDetector = PluginManager.get().createInstance(className);
failureDetector.init(config, brokerMetrics);
LOGGER.info("Initialized CUSTOM failure detector with class: {}", className);
return failureDetector;
} catch (Exception e) {
throw new RuntimeException(
"Caught exception while initializing CUSTOM failure detector with class: " + className, e);
}
}
default:
throw new IllegalStateException("Unsupported failure detector type: " + type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ConnectionFailureDetectorTest {
@BeforeClass
public void setUp() {
PinotConfiguration config = new PinotConfiguration();
config.setProperty(Broker.FailureDetector.CONFIG_OF_CLASS_NAME, ConnectionFailureDetector.class.getName());
config.setProperty(Broker.FailureDetector.CONFIG_OF_TYPE, Broker.FailureDetector.Type.CONNECTION.name());
config.setProperty(Broker.FailureDetector.CONFIG_OF_RETRY_INITIAL_DELAY_MS, 100);
config.setProperty(Broker.FailureDetector.CONFIG_OF_RETRY_DELAY_FACTOR, 1);
_brokerMetrics = new BrokerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
import org.apache.pinot.broker.failuredetector.ConnectionFailureDetector;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand Down Expand Up @@ -64,7 +63,7 @@ protected int getNumReplicas() {

@Override
protected void overrideBrokerConf(PinotConfiguration brokerConf) {
brokerConf.setProperty(FailureDetector.CONFIG_OF_CLASS_NAME, ConnectionFailureDetector.class.getName());
brokerConf.setProperty(FailureDetector.CONFIG_OF_TYPE, FailureDetector.Type.CONNECTION.name());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,19 @@ public static class QueryOptionKey {
}

public static class FailureDetector {
public enum Type {
// Do not detect any failure
NO_OP,

// Detect connection failures
CONNECTION,

// Use the custom failure detector of the configured class name
CUSTOM
}

public static final String CONFIG_OF_TYPE = "pinot.broker.failure.detector.type";
public static final String DEFAULT_TYPE = Type.NO_OP.name();
public static final String CONFIG_OF_CLASS_NAME = "pinot.broker.failure.detector.class";

// Exponential backoff delay of retrying an unhealthy server when a failure is detected
Expand Down Expand Up @@ -510,11 +523,9 @@ public static class Segment {
public static class Realtime {
public enum Status {
// Means the segment is in CONSUMING state.
IN_PROGRESS,
// Means the segment is in ONLINE state (segment completed consuming and has been saved in
IN_PROGRESS, // Means the segment is in ONLINE state (segment completed consuming and has been saved in
// segment store).
DONE,
// Means the segment is uploaded to a Pinot controller by an external party.
DONE, // Means the segment is uploaded to a Pinot controller by an external party.
UPLOADED
}

Expand All @@ -525,8 +536,7 @@ public enum Status {
public enum CompletionMode {
// default behavior - if the in memory segment in the non-winner server is equivalent to the committed
// segment, then build and replace, else download
DEFAULT,
// non-winner servers always download the segment, never build it
DEFAULT, // non-winner servers always download the segment, never build it
DOWNLOAD
}

Expand Down

0 comments on commit 86d6469

Please sign in to comment.