diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/failuredetector/BaseExponentialBackoffRetryFailureDetector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/failuredetector/BaseExponentialBackoffRetryFailureDetector.java index 12158b3c862d..78839ad1b241 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/failuredetector/BaseExponentialBackoffRetryFailureDetector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/failuredetector/BaseExponentialBackoffRetryFailureDetector.java @@ -20,9 +20,10 @@ import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.metrics.BrokerGauge; @@ -39,10 +40,12 @@ */ @ThreadSafe public abstract class BaseExponentialBackoffRetryFailureDetector implements FailureDetector { - protected final Logger _logger = LoggerFactory.getLogger(getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(BaseExponentialBackoffRetryFailureDetector.class); + protected final String _name = getClass().getSimpleName(); protected final List _listeners = new ArrayList<>(); protected final ConcurrentHashMap _unhealthyServerRetryInfoMap = new ConcurrentHashMap<>(); + protected final DelayQueue _retryInfoDelayQueue = new DelayQueue<>(); protected BrokerMetrics _brokerMetrics; protected long _retryInitialDelayNs; @@ -62,8 +65,8 @@ public void init(PinotConfiguration config, BrokerMetrics brokerMetrics) { Broker.FailureDetector.DEFAULT_RETRY_DELAY_FACTOR); _maxRetries = config.getProperty(Broker.FailureDetector.CONFIG_OF_MAX_RETRIES, Broker.FailureDetector.DEFAULT_MAX_RETIRES); - _logger.info("Initialized {} with retry initial delay: {}ms, exponential backoff factor: {}, max " + "retries: {}", - _name, retryInitialDelayMs, _retryDelayFactor, _maxRetries); + LOGGER.info("Initialized {} with retry initial delay: {}ms, exponential backoff factor: {}, max retries: {}", _name, + retryInitialDelayMs, _retryDelayFactor, _maxRetries); } @Override @@ -73,41 +76,36 @@ public void register(Listener listener) { @Override public void start() { - _logger.info("Starting {}", _name); + LOGGER.info("Starting {}", _name); _running = true; _retryThread = new Thread(() -> { while (_running) { try { - long earliestRetryTimeNs = System.nanoTime() + _retryInitialDelayNs; - for (Map.Entry entry : _unhealthyServerRetryInfoMap.entrySet()) { - RetryInfo retryInfo = entry.getValue(); - if (System.nanoTime() > retryInfo._retryTimeNs) { - String instanceId = entry.getKey(); - if (retryInfo._numRetires == _maxRetries) { - _logger.warn( - "Unhealthy server: {} already reaches the max retries: {}, do not retry again and treat it as " - + "healthy so that the listeners do not lose track of the server", instanceId, _maxRetries); - markServerHealthy(instanceId); - continue; - } - _logger.info("Retry unhealthy server: {}", instanceId); - for (Listener listener : _listeners) { - listener.retryUnhealthyServer(instanceId, this); - } - // Update the retry info - retryInfo._retryDelayNs = (long) (retryInfo._retryDelayNs * _retryDelayFactor); - retryInfo._retryTimeNs = System.nanoTime() + retryInfo._retryDelayNs; - retryInfo._numRetires++; - } else { - earliestRetryTimeNs = Math.min(earliestRetryTimeNs, retryInfo._retryTimeNs); - } + RetryInfo retryInfo = _retryInfoDelayQueue.take(); + String instanceId = retryInfo._instanceId; + if (_unhealthyServerRetryInfoMap.get(instanceId) != retryInfo) { + LOGGER.info("Server: {} has been marked healthy, skipping the retry", instanceId); + continue; + } + if (retryInfo._numRetires == _maxRetries) { + LOGGER.warn("Unhealthy server: {} already reaches the max retries: {}, do not retry again and treat it " + + "as healthy so that the listeners do not lose track of the server", instanceId, _maxRetries); + markServerHealthy(instanceId); + continue; + } + LOGGER.info("Retry unhealthy server: {}", instanceId); + for (Listener listener : _listeners) { + listener.retryUnhealthyServer(instanceId, this); } - //noinspection BusyWait - Thread.sleep(TimeUnit.NANOSECONDS.toMillis(earliestRetryTimeNs - System.nanoTime())); + // Update the retry info and add it back to the delay queue + retryInfo._retryDelayNs = (long) (retryInfo._retryDelayNs * _retryDelayFactor); + retryInfo._retryTimeNs = System.nanoTime() + retryInfo._retryDelayNs; + retryInfo._numRetires++; + _retryInfoDelayQueue.offer(retryInfo); } catch (Exception e) { if (_running) { - _logger.error("Caught exception in the retry thread, continuing with errors", e); + LOGGER.error("Caught exception in the retry thread, continuing with errors", e); } } } @@ -119,24 +117,28 @@ public void start() { @Override public void markServerHealthy(String instanceId) { - if (_unhealthyServerRetryInfoMap.remove(instanceId) != null) { - _logger.info("Mark server: {} as healthy", instanceId); - _brokerMetrics.setValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS, _unhealthyServerRetryInfoMap.size()); + _unhealthyServerRetryInfoMap.computeIfPresent(instanceId, (id, retryInfo) -> { + LOGGER.info("Mark server: {} as healthy", instanceId); + _brokerMetrics.setValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS, _unhealthyServerRetryInfoMap.size() - 1); for (Listener listener : _listeners) { listener.notifyHealthyServer(instanceId, this); } - } + return null; + }); } @Override public void markServerUnhealthy(String instanceId) { - if (_unhealthyServerRetryInfoMap.putIfAbsent(instanceId, new RetryInfo()) == null) { - _logger.warn("Mark server: {} as unhealthy", instanceId); - _brokerMetrics.setValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS, _unhealthyServerRetryInfoMap.size()); + _unhealthyServerRetryInfoMap.computeIfAbsent(instanceId, id -> { + LOGGER.warn("Mark server: {} as unhealthy", instanceId); + _brokerMetrics.setValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS, _unhealthyServerRetryInfoMap.size() + 1); for (Listener listener : _listeners) { listener.notifyUnhealthyServer(instanceId, this); } - } + RetryInfo retryInfo = new RetryInfo(id); + _retryInfoDelayQueue.offer(retryInfo); + return retryInfo; + }); } @Override @@ -146,7 +148,7 @@ public Set getUnhealthyServers() { @Override public void stop() { - _logger.info("Stopping {}", _name); + LOGGER.info("Stopping {}", _name); _running = false; try { @@ -160,15 +162,29 @@ public void stop() { /** * Encapsulates the retry related information. */ - protected class RetryInfo { + protected class RetryInfo implements Delayed { + final String _instanceId; + long _retryTimeNs; long _retryDelayNs; int _numRetires; - RetryInfo() { + RetryInfo(String instanceId) { + _instanceId = instanceId; _retryTimeNs = System.nanoTime() + _retryInitialDelayNs; _retryDelayNs = _retryInitialDelayNs; _numRetires = 0; } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(System.nanoTime() - _retryTimeNs, TimeUnit.NANOSECONDS); + } + + @Override + public int compareTo(Delayed o) { + RetryInfo that = (RetryInfo) o; + return Long.compare(that._retryTimeNs, _retryTimeNs); + } } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/failuredetector/FailureDetectorFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/failuredetector/FailureDetectorFactory.java index b8792eccaec4..718c064d08da 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/failuredetector/FailureDetectorFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/failuredetector/FailureDetectorFactory.java @@ -21,6 +21,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.spi.utils.CommonConstants.Broker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,8 +41,7 @@ public static FailureDetector getFailureDetector(PinotConfiguration config, Brok } else { LOGGER.info("Initializing failure detector with class: {}", className); try { - FailureDetector failureDetector = - (FailureDetector) Class.forName(className).getDeclaredConstructor().newInstance(); + FailureDetector failureDetector = PluginManager.get().createInstance(className); failureDetector.init(config, brokerMetrics); LOGGER.info("Initialized failure detector with class: {}", className); return failureDetector; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java index 4754cdd852a3..bf420cc31cf8 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java @@ -318,11 +318,11 @@ private static boolean isEnabledServer(ZNRecord instanceConfigZNRecord) { instanceConfigZNRecord.getSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name()))) { return false; } - if ("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(Helix.IS_SHUTDOWN_IN_PROGRESS))) { + if (Boolean.parseBoolean(instanceConfigZNRecord.getSimpleField(Helix.IS_SHUTDOWN_IN_PROGRESS))) { return false; } //noinspection RedundantIfStatement - if ("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(Helix.QUERIES_DISABLED))) { + if (Boolean.parseBoolean(instanceConfigZNRecord.getSimpleField(Helix.QUERIES_DISABLED))) { return false; } return true; diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/failuredetector/ConnectionFailureDetectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/failuredetector/ConnectionFailureDetectorTest.java index 344d226f0ee9..38e92f94d472 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/failuredetector/ConnectionFailureDetectorTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/failuredetector/ConnectionFailureDetectorTest.java @@ -19,6 +19,7 @@ package org.apache.pinot.broker.failuredetector; import java.util.Collections; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.apache.pinot.common.metrics.BrokerGauge; import org.apache.pinot.common.metrics.BrokerMetrics; @@ -67,31 +68,19 @@ public void testConnectionFailure() { // No failure detection when submitting the query _failureDetector.notifyQuerySubmitted(queryResponse); - assertTrue(_failureDetector.getUnhealthyServers().isEmpty()); - assertEquals(_brokerMetrics.getValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS), 0); - assertEquals(_listener._notifyUnhealthyServerCalled.get(), 0); - assertEquals(_listener._notifyHealthyServerCalled.get(), 0); + verify(Collections.emptySet(), 0, 0); // When query finishes, the failed server should be count as unhealthy and trigger a callback _failureDetector.notifyQueryFinished(queryResponse); - assertEquals(_failureDetector.getUnhealthyServers(), Collections.singleton(INSTANCE_ID)); - assertEquals(_brokerMetrics.getValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS), 1); - assertEquals(_listener._notifyUnhealthyServerCalled.get(), 1); - assertEquals(_listener._notifyHealthyServerCalled.get(), 0); + verify(Collections.singleton(INSTANCE_ID), 1, 0); // Mark server unhealthy again should have no effect _failureDetector.markServerUnhealthy(INSTANCE_ID); - assertEquals(_failureDetector.getUnhealthyServers(), Collections.singleton(INSTANCE_ID)); - assertEquals(_brokerMetrics.getValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS), 1); - assertEquals(_listener._notifyUnhealthyServerCalled.get(), 1); - assertEquals(_listener._notifyHealthyServerCalled.get(), 0); + verify(Collections.singleton(INSTANCE_ID), 1, 0); // Mark server healthy should remove it from the unhealthy servers and trigger a callback _failureDetector.markServerHealthy(INSTANCE_ID); - assertTrue(_failureDetector.getUnhealthyServers().isEmpty()); - assertEquals(_brokerMetrics.getValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS), 0); - assertEquals(_listener._notifyUnhealthyServerCalled.get(), 1); - assertEquals(_listener._notifyHealthyServerCalled.get(), 1); + verify(Collections.emptySet(), 1, 1); _listener.reset(); } @@ -99,10 +88,8 @@ public void testConnectionFailure() { @Test public void testRetry() { _failureDetector.markServerUnhealthy(INSTANCE_ID); - assertEquals(_failureDetector.getUnhealthyServers(), Collections.singleton(INSTANCE_ID)); - assertEquals(_brokerMetrics.getValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS), 1); - assertEquals(_listener._notifyUnhealthyServerCalled.get(), 1); - assertEquals(_listener._notifyHealthyServerCalled.get(), 0); + verify(Collections.singleton(INSTANCE_ID), 1, 0); + // Should get 10 retries in 1s, then remove the failed server from the unhealthy servers. // Wait for up to 5s to avoid flakiness TestUtils.waitForCondition(aVoid -> { @@ -123,6 +110,14 @@ public void testRetry() { _listener.reset(); } + private void verify(Set expectedUnhealthyServers, int expectedNotifyUnhealthyServerCalled, + int expectedNotifyHealthyServerCalled) { + assertEquals(_failureDetector.getUnhealthyServers(), expectedUnhealthyServers); + assertEquals(_brokerMetrics.getValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS), expectedUnhealthyServers.size()); + assertEquals(_listener._notifyUnhealthyServerCalled.get(), expectedNotifyUnhealthyServerCalled); + assertEquals(_listener._notifyHealthyServerCalled.get(), expectedNotifyHealthyServerCalled); + } + @AfterClass public void tearDown() { _failureDetector.stop(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java index f7549747b5f5..680020314c59 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java @@ -149,10 +149,16 @@ private void markQueryFailed(long requestId, ServerRoutingInstance serverRouting * Connects to the given server, returns {@code true} if the server is successfully connected. */ public boolean connect(ServerInstance serverInstance) { - if (_serverChannelsTls != null) { - return _serverChannelsTls.connect(serverInstance.toServerRoutingInstance(TableType.OFFLINE, true)); - } else { - return _serverChannels.connect(serverInstance.toServerRoutingInstance(TableType.OFFLINE, false)); + try { + if (_serverChannelsTls != null) { + _serverChannelsTls.connect(serverInstance.toServerRoutingInstance(TableType.OFFLINE, true)); + } else { + _serverChannels.connect(serverInstance.toServerRoutingInstance(TableType.OFFLINE, false)); + } + return true; + } catch (Exception e) { + LOGGER.debug("Failed to connect to server: {}", serverInstance, e); + return false; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java index 5742816a6f4d..8b76b8ed2250 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java @@ -116,11 +116,9 @@ public void sendRequest(String rawTableName, AsyncQueryResponse asyncQueryRespon .sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, requestBytes, timeoutMs); } - /** - * Connects to the given server, returns {@code true} if the server is successfully connected. - */ - public boolean connect(ServerRoutingInstance serverRoutingInstance) { - return _serverToChannelMap.computeIfAbsent(serverRoutingInstance, ServerChannel::new).connect(); + public void connect(ServerRoutingInstance serverRoutingInstance) + throws InterruptedException, TimeoutException { + _serverToChannelMap.computeIfAbsent(serverRoutingInstance, ServerChannel::new).connect(); } public void shutDown() { @@ -214,20 +212,17 @@ void sendRequestWithoutLocking(String rawTableName, AsyncQueryResponse asyncQuer _brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_BYTES_SENT, requestBytes.length); } - boolean connect() { - try { - if (_channelLock.tryLock(TRY_CONNECT_CHANNEL_LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { - try { - connectWithoutLocking(); - return true; - } finally { - _channelLock.unlock(); - } + void connect() + throws InterruptedException, TimeoutException { + if (_channelLock.tryLock(TRY_CONNECT_CHANNEL_LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { + try { + connectWithoutLocking(); + } finally { + _channelLock.unlock(); } - } catch (Exception e) { - // Ignored + } else { + throw new TimeoutException(CHANNEL_LOCK_TIMEOUT_MSG); } - return false; } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java index 086ea9a8fb0f..b5b1049f4941 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java @@ -143,9 +143,6 @@ public int hashCode() { return _instanceId.hashCode(); } - /** - * Use default format {@code Server__} for backward-compatibility. - */ @Override public String toString() { return _instanceId; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerRoutingInstance.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerRoutingInstance.java index 66b2559d7773..9b4f9926fb5b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerRoutingInstance.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerRoutingInstance.java @@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.net.InternetDomainName; import java.util.Map; -import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.spi.config.table.TableType; @@ -113,7 +112,7 @@ public boolean equals(Object o) { public int hashCode() { // NOTE: Only check hostname, port and tableType for performance concern because they can identify a routing // instance within the same query - return Objects.hash(_hostname, _port, _tableType); + return 31 * 31 * _hostname.hashCode() + 31 * Integer.hashCode(_port) + _tableType.hashCode(); } @Override diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java index a3593a2b332a..9946ddfa15ad 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java @@ -52,11 +52,14 @@ public void testThreadExceptionTransfer() { // supposedly we can use TestNG's annotation like @Test(expectedExceptions = { IOException.class }) to verify // here we hope to verify deeper to make sure the thrown exception is nested inside the exception assertTrue(verifyException(() -> { - StreamingReduceService.processIterativeServerResponse(mock(StreamingReducer.class), threadPoolService, - ImmutableMap.of(routingInstance, mockedResponse), 1000, - mock(BaseReduceService.ExecutionStatsAggregator.class)); - return null; - }, cause -> cause.getMessage().contains(exceptionMessage))); + StreamingReduceService.processIterativeServerResponse(mock(StreamingReducer.class), + threadPoolService, + ImmutableMap.of(routingInstance, mockedResponse), + 1000, + mock(BaseReduceService.ExecutionStatsAggregator.class)); + return null; + }, cause -> cause.getMessage().contains(exceptionMessage)) + ); } @Test @@ -75,13 +78,17 @@ public Void answer(InvocationOnMock invocationOnMock) }); final ExecutorService threadPoolService = Executors.newFixedThreadPool(1); final ServerRoutingInstance routingInstance = new ServerRoutingInstance("localhost", 9527, TableType.OFFLINE); - // We cannot use TestNG's annotation like @Test(expectedExceptions = { IOException.class }) to verify + //We cannot use TestNG's annotation like @Test(expectedExceptions = { IOException.class }) to verify // because the Exception we hope to verify is nested inside the final exception. assertTrue(verifyException(() -> { - StreamingReduceService.processIterativeServerResponse(mock(StreamingReducer.class), threadPoolService, - ImmutableMap.of(routingInstance, mockedResponse), 10, mock(BaseReduceService.ExecutionStatsAggregator.class)); - return null; - }, (cause) -> cause instanceof TimeoutException)); + StreamingReduceService.processIterativeServerResponse(mock(StreamingReducer.class), + threadPoolService, + ImmutableMap.of(routingInstance, mockedResponse), + 10, + mock(BaseReduceService.ExecutionStatsAggregator.class)); + return null; + }, + (cause) -> cause instanceof TimeoutException)); } private static boolean verifyException(Callable verifyTarget, Predicate verifyCause) { @@ -92,7 +99,8 @@ private static boolean verifyException(Callable verifyTarget, Predicate