Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Apr 13, 2022
1 parent 160dbd9 commit 8079026
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.BrokerGauge;
Expand All @@ -39,10 +40,12 @@
*/
@ThreadSafe
public abstract class BaseExponentialBackoffRetryFailureDetector implements FailureDetector {
protected final Logger _logger = LoggerFactory.getLogger(getClass());
private static final Logger LOGGER = LoggerFactory.getLogger(BaseExponentialBackoffRetryFailureDetector.class);

protected final String _name = getClass().getSimpleName();
protected final List<Listener> _listeners = new ArrayList<>();
protected final ConcurrentHashMap<String, RetryInfo> _unhealthyServerRetryInfoMap = new ConcurrentHashMap<>();
protected final DelayQueue<RetryInfo> _retryInfoDelayQueue = new DelayQueue<>();

protected BrokerMetrics _brokerMetrics;
protected long _retryInitialDelayNs;
Expand All @@ -62,8 +65,8 @@ public void init(PinotConfiguration config, BrokerMetrics brokerMetrics) {
Broker.FailureDetector.DEFAULT_RETRY_DELAY_FACTOR);
_maxRetries =
config.getProperty(Broker.FailureDetector.CONFIG_OF_MAX_RETRIES, Broker.FailureDetector.DEFAULT_MAX_RETIRES);
_logger.info("Initialized {} with retry initial delay: {}ms, exponential backoff factor: {}, max " + "retries: {}",
_name, retryInitialDelayMs, _retryDelayFactor, _maxRetries);
LOGGER.info("Initialized {} with retry initial delay: {}ms, exponential backoff factor: {}, max retries: {}", _name,
retryInitialDelayMs, _retryDelayFactor, _maxRetries);
}

@Override
Expand All @@ -73,41 +76,36 @@ public void register(Listener listener) {

@Override
public void start() {
_logger.info("Starting {}", _name);
LOGGER.info("Starting {}", _name);
_running = true;

_retryThread = new Thread(() -> {
while (_running) {
try {
long earliestRetryTimeNs = System.nanoTime() + _retryInitialDelayNs;
for (Map.Entry<String, RetryInfo> entry : _unhealthyServerRetryInfoMap.entrySet()) {
RetryInfo retryInfo = entry.getValue();
if (System.nanoTime() > retryInfo._retryTimeNs) {
String instanceId = entry.getKey();
if (retryInfo._numRetires == _maxRetries) {
_logger.warn(
"Unhealthy server: {} already reaches the max retries: {}, do not retry again and treat it as "
+ "healthy so that the listeners do not lose track of the server", instanceId, _maxRetries);
markServerHealthy(instanceId);
continue;
}
_logger.info("Retry unhealthy server: {}", instanceId);
for (Listener listener : _listeners) {
listener.retryUnhealthyServer(instanceId, this);
}
// Update the retry info
retryInfo._retryDelayNs = (long) (retryInfo._retryDelayNs * _retryDelayFactor);
retryInfo._retryTimeNs = System.nanoTime() + retryInfo._retryDelayNs;
retryInfo._numRetires++;
} else {
earliestRetryTimeNs = Math.min(earliestRetryTimeNs, retryInfo._retryTimeNs);
}
RetryInfo retryInfo = _retryInfoDelayQueue.take();
String instanceId = retryInfo._instanceId;
if (_unhealthyServerRetryInfoMap.get(instanceId) != retryInfo) {
LOGGER.info("Server: {} has been marked healthy, skipping the retry", instanceId);
continue;
}
if (retryInfo._numRetires == _maxRetries) {
LOGGER.warn("Unhealthy server: {} already reaches the max retries: {}, do not retry again and treat it "
+ "as healthy so that the listeners do not lose track of the server", instanceId, _maxRetries);
markServerHealthy(instanceId);
continue;
}
LOGGER.info("Retry unhealthy server: {}", instanceId);
for (Listener listener : _listeners) {
listener.retryUnhealthyServer(instanceId, this);
}
//noinspection BusyWait
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(earliestRetryTimeNs - System.nanoTime()));
// Update the retry info and add it back to the delay queue
retryInfo._retryDelayNs = (long) (retryInfo._retryDelayNs * _retryDelayFactor);
retryInfo._retryTimeNs = System.nanoTime() + retryInfo._retryDelayNs;
retryInfo._numRetires++;
_retryInfoDelayQueue.offer(retryInfo);
} catch (Exception e) {
if (_running) {
_logger.error("Caught exception in the retry thread, continuing with errors", e);
LOGGER.error("Caught exception in the retry thread, continuing with errors", e);
}
}
}
Expand All @@ -119,24 +117,28 @@ public void start() {

@Override
public void markServerHealthy(String instanceId) {
if (_unhealthyServerRetryInfoMap.remove(instanceId) != null) {
_logger.info("Mark server: {} as healthy", instanceId);
_brokerMetrics.setValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS, _unhealthyServerRetryInfoMap.size());
_unhealthyServerRetryInfoMap.computeIfPresent(instanceId, (id, retryInfo) -> {
LOGGER.info("Mark server: {} as healthy", instanceId);
_brokerMetrics.setValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS, _unhealthyServerRetryInfoMap.size() - 1);
for (Listener listener : _listeners) {
listener.notifyHealthyServer(instanceId, this);
}
}
return null;
});
}

@Override
public void markServerUnhealthy(String instanceId) {
if (_unhealthyServerRetryInfoMap.putIfAbsent(instanceId, new RetryInfo()) == null) {
_logger.warn("Mark server: {} as unhealthy", instanceId);
_brokerMetrics.setValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS, _unhealthyServerRetryInfoMap.size());
_unhealthyServerRetryInfoMap.computeIfAbsent(instanceId, id -> {
LOGGER.warn("Mark server: {} as unhealthy", instanceId);
_brokerMetrics.setValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS, _unhealthyServerRetryInfoMap.size() + 1);
for (Listener listener : _listeners) {
listener.notifyUnhealthyServer(instanceId, this);
}
}
RetryInfo retryInfo = new RetryInfo(id);
_retryInfoDelayQueue.offer(retryInfo);
return retryInfo;
});
}

@Override
Expand All @@ -146,7 +148,7 @@ public Set<String> getUnhealthyServers() {

@Override
public void stop() {
_logger.info("Stopping {}", _name);
LOGGER.info("Stopping {}", _name);
_running = false;

try {
Expand All @@ -160,15 +162,29 @@ public void stop() {
/**
* Encapsulates the retry related information.
*/
protected class RetryInfo {
protected class RetryInfo implements Delayed {
final String _instanceId;

long _retryTimeNs;
long _retryDelayNs;
int _numRetires;

RetryInfo() {
RetryInfo(String instanceId) {
_instanceId = instanceId;
_retryTimeNs = System.nanoTime() + _retryInitialDelayNs;
_retryDelayNs = _retryInitialDelayNs;
_numRetires = 0;
}

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(System.nanoTime() - _retryTimeNs, TimeUnit.NANOSECONDS);
}

@Override
public int compareTo(Delayed o) {
RetryInfo that = (RetryInfo) o;
return Long.compare(that._retryTimeNs, _retryTimeNs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.CommonConstants.Broker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,8 +41,7 @@ public static FailureDetector getFailureDetector(PinotConfiguration config, Brok
} else {
LOGGER.info("Initializing failure detector with class: {}", className);
try {
FailureDetector failureDetector =
(FailureDetector) Class.forName(className).getDeclaredConstructor().newInstance();
FailureDetector failureDetector = PluginManager.get().createInstance(className);
failureDetector.init(config, brokerMetrics);
LOGGER.info("Initialized failure detector with class: {}", className);
return failureDetector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,11 @@ private static boolean isEnabledServer(ZNRecord instanceConfigZNRecord) {
instanceConfigZNRecord.getSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name()))) {
return false;
}
if ("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(Helix.IS_SHUTDOWN_IN_PROGRESS))) {
if (Boolean.parseBoolean(instanceConfigZNRecord.getSimpleField(Helix.IS_SHUTDOWN_IN_PROGRESS))) {
return false;
}
//noinspection RedundantIfStatement
if ("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(Helix.QUERIES_DISABLED))) {
if (Boolean.parseBoolean(instanceConfigZNRecord.getSimpleField(Helix.QUERIES_DISABLED))) {
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.broker.failuredetector;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMetrics;
Expand Down Expand Up @@ -67,42 +68,28 @@ public void testConnectionFailure() {

// No failure detection when submitting the query
_failureDetector.notifyQuerySubmitted(queryResponse);
assertTrue(_failureDetector.getUnhealthyServers().isEmpty());
assertEquals(_brokerMetrics.getValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS), 0);
assertEquals(_listener._notifyUnhealthyServerCalled.get(), 0);
assertEquals(_listener._notifyHealthyServerCalled.get(), 0);
verify(Collections.emptySet(), 0, 0);

// When query finishes, the failed server should be count as unhealthy and trigger a callback
_failureDetector.notifyQueryFinished(queryResponse);
assertEquals(_failureDetector.getUnhealthyServers(), Collections.singleton(INSTANCE_ID));
assertEquals(_brokerMetrics.getValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS), 1);
assertEquals(_listener._notifyUnhealthyServerCalled.get(), 1);
assertEquals(_listener._notifyHealthyServerCalled.get(), 0);
verify(Collections.singleton(INSTANCE_ID), 1, 0);

// Mark server unhealthy again should have no effect
_failureDetector.markServerUnhealthy(INSTANCE_ID);
assertEquals(_failureDetector.getUnhealthyServers(), Collections.singleton(INSTANCE_ID));
assertEquals(_brokerMetrics.getValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS), 1);
assertEquals(_listener._notifyUnhealthyServerCalled.get(), 1);
assertEquals(_listener._notifyHealthyServerCalled.get(), 0);
verify(Collections.singleton(INSTANCE_ID), 1, 0);

// Mark server healthy should remove it from the unhealthy servers and trigger a callback
_failureDetector.markServerHealthy(INSTANCE_ID);
assertTrue(_failureDetector.getUnhealthyServers().isEmpty());
assertEquals(_brokerMetrics.getValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS), 0);
assertEquals(_listener._notifyUnhealthyServerCalled.get(), 1);
assertEquals(_listener._notifyHealthyServerCalled.get(), 1);
verify(Collections.emptySet(), 1, 1);

_listener.reset();
}

@Test
public void testRetry() {
_failureDetector.markServerUnhealthy(INSTANCE_ID);
assertEquals(_failureDetector.getUnhealthyServers(), Collections.singleton(INSTANCE_ID));
assertEquals(_brokerMetrics.getValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS), 1);
assertEquals(_listener._notifyUnhealthyServerCalled.get(), 1);
assertEquals(_listener._notifyHealthyServerCalled.get(), 0);
verify(Collections.singleton(INSTANCE_ID), 1, 0);

// Should get 10 retries in 1s, then remove the failed server from the unhealthy servers.
// Wait for up to 5s to avoid flakiness
TestUtils.waitForCondition(aVoid -> {
Expand All @@ -123,6 +110,14 @@ public void testRetry() {
_listener.reset();
}

private void verify(Set<String> expectedUnhealthyServers, int expectedNotifyUnhealthyServerCalled,
int expectedNotifyHealthyServerCalled) {
assertEquals(_failureDetector.getUnhealthyServers(), expectedUnhealthyServers);
assertEquals(_brokerMetrics.getValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS), expectedUnhealthyServers.size());
assertEquals(_listener._notifyUnhealthyServerCalled.get(), expectedNotifyUnhealthyServerCalled);
assertEquals(_listener._notifyHealthyServerCalled.get(), expectedNotifyHealthyServerCalled);
}

@AfterClass
public void tearDown() {
_failureDetector.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,16 @@ private void markQueryFailed(long requestId, ServerRoutingInstance serverRouting
* Connects to the given server, returns {@code true} if the server is successfully connected.
*/
public boolean connect(ServerInstance serverInstance) {
if (_serverChannelsTls != null) {
return _serverChannelsTls.connect(serverInstance.toServerRoutingInstance(TableType.OFFLINE, true));
} else {
return _serverChannels.connect(serverInstance.toServerRoutingInstance(TableType.OFFLINE, false));
try {
if (_serverChannelsTls != null) {
_serverChannelsTls.connect(serverInstance.toServerRoutingInstance(TableType.OFFLINE, true));
} else {
_serverChannels.connect(serverInstance.toServerRoutingInstance(TableType.OFFLINE, false));
}
return true;
} catch (Exception e) {
LOGGER.debug("Failed to connect to server: {}", serverInstance, e);
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,9 @@ public void sendRequest(String rawTableName, AsyncQueryResponse asyncQueryRespon
.sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, requestBytes, timeoutMs);
}

/**
* Connects to the given server, returns {@code true} if the server is successfully connected.
*/
public boolean connect(ServerRoutingInstance serverRoutingInstance) {
return _serverToChannelMap.computeIfAbsent(serverRoutingInstance, ServerChannel::new).connect();
public void connect(ServerRoutingInstance serverRoutingInstance)
throws InterruptedException, TimeoutException {
_serverToChannelMap.computeIfAbsent(serverRoutingInstance, ServerChannel::new).connect();
}

public void shutDown() {
Expand Down Expand Up @@ -214,20 +212,17 @@ void sendRequestWithoutLocking(String rawTableName, AsyncQueryResponse asyncQuer
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_BYTES_SENT, requestBytes.length);
}

boolean connect() {
try {
if (_channelLock.tryLock(TRY_CONNECT_CHANNEL_LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
try {
connectWithoutLocking();
return true;
} finally {
_channelLock.unlock();
}
void connect()
throws InterruptedException, TimeoutException {
if (_channelLock.tryLock(TRY_CONNECT_CHANNEL_LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
try {
connectWithoutLocking();
} finally {
_channelLock.unlock();
}
} catch (Exception e) {
// Ignored
} else {
throw new TimeoutException(CHANNEL_LOCK_TIMEOUT_MSG);
}
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,6 @@ public int hashCode() {
return _instanceId.hashCode();
}

/**
* Use default format {@code Server_<hostname>_<port>} for backward-compatibility.
*/
@Override
public String toString() {
return _instanceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.net.InternetDomainName;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.spi.config.table.TableType;
Expand Down Expand Up @@ -113,7 +112,7 @@ public boolean equals(Object o) {
public int hashCode() {
// NOTE: Only check hostname, port and tableType for performance concern because they can identify a routing
// instance within the same query
return Objects.hash(_hostname, _port, _tableType);
return 31 * 31 * _hostname.hashCode() + 31 * Integer.hashCode(_port) + _tableType.hashCode();
}

@Override
Expand Down
Loading

0 comments on commit 8079026

Please sign in to comment.