Skip to content

Commit

Permalink
Add connection based FailureDetector (#8491)
Browse files Browse the repository at this point in the history
- Add FailureDetector module to the broker
- Add QueryResponse interface
- Add ConnectionFailureDetector to detect server connection failures
  • Loading branch information
Jackie-Jiang authored Apr 16, 2022
1 parent 6fb7e52 commit ce0a08a
Show file tree
Hide file tree
Showing 28 changed files with 1,188 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ rules:
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"BrokerMetrics\", name=\"pinot.broker.nettyConnection(\\w+)\"><>(\\w+)"
name: "pinot_broker_nettyConnection_$1_$2"
cache: true
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"BrokerMetrics\", name=\"pinot.broker.unhealthyServers\"><>(\\w+)"
name: "pinot_broker_unhealthyServers_$1"
cache: true
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"BrokerMetrics\", name=\"pinot.broker.clusterChangeCheck\"\"><>(\\w+)"
name: "pinot_broker_clusterChangeCheck_$1"
cache: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ rules:
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"BrokerMetrics\", name=\"pinot.broker.nettyConnection(\\w+)\"><>(\\w+)"
name: "pinot_broker_nettyConnection_$1_$2"
cache: true
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"BrokerMetrics\", name=\"pinot.broker.unhealthyServers\"><>(\\w+)"
name: "pinot_broker_unhealthyServers_$1"
cache: true
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"BrokerMetrics\", name=\"pinot.broker.clusterChangeCheck\"\"><>(\\w+)"
name: "pinot_broker_clusterChangeCheck_$1"
cache: true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.broker.failuredetector;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants.Broker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* The {@code BaseExponentialBackoffRetryFailureDetector} is a base failure detector implementation that retries the
* unhealthy servers with exponential increasing delays.
*/
@ThreadSafe
public abstract class BaseExponentialBackoffRetryFailureDetector implements FailureDetector {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseExponentialBackoffRetryFailureDetector.class);

protected final String _name = getClass().getSimpleName();
protected final List<Listener> _listeners = new ArrayList<>();
protected final ConcurrentHashMap<String, RetryInfo> _unhealthyServerRetryInfoMap = new ConcurrentHashMap<>();
protected final DelayQueue<RetryInfo> _retryInfoDelayQueue = new DelayQueue<>();

protected BrokerMetrics _brokerMetrics;
protected long _retryInitialDelayNs;
protected double _retryDelayFactor;
protected int _maxRetries;
protected Thread _retryThread;

protected volatile boolean _running;

@Override
public void init(PinotConfiguration config, BrokerMetrics brokerMetrics) {
_brokerMetrics = brokerMetrics;
long retryInitialDelayMs = config.getProperty(Broker.FailureDetector.CONFIG_OF_RETRY_INITIAL_DELAY_MS,
Broker.FailureDetector.DEFAULT_RETRY_INITIAL_DELAY_MS);
_retryInitialDelayNs = TimeUnit.MILLISECONDS.toNanos(retryInitialDelayMs);
_retryDelayFactor = config.getProperty(Broker.FailureDetector.CONFIG_OF_RETRY_DELAY_FACTOR,
Broker.FailureDetector.DEFAULT_RETRY_DELAY_FACTOR);
_maxRetries =
config.getProperty(Broker.FailureDetector.CONFIG_OF_MAX_RETRIES, Broker.FailureDetector.DEFAULT_MAX_RETIRES);
LOGGER.info("Initialized {} with retry initial delay: {}ms, exponential backoff factor: {}, max retries: {}", _name,
retryInitialDelayMs, _retryDelayFactor, _maxRetries);
}

@Override
public void register(Listener listener) {
_listeners.add(listener);
}

@Override
public void start() {
LOGGER.info("Starting {}", _name);
_running = true;

_retryThread = new Thread(() -> {
while (_running) {
try {
RetryInfo retryInfo = _retryInfoDelayQueue.take();
String instanceId = retryInfo._instanceId;
if (_unhealthyServerRetryInfoMap.get(instanceId) != retryInfo) {
LOGGER.info("Server: {} has been marked healthy, skipping the retry", instanceId);
continue;
}
if (retryInfo._numRetires == _maxRetries) {
LOGGER.warn("Unhealthy server: {} already reaches the max retries: {}, do not retry again and treat it "
+ "as healthy so that the listeners do not lose track of the server", instanceId, _maxRetries);
markServerHealthy(instanceId);
continue;
}
LOGGER.info("Retry unhealthy server: {}", instanceId);
for (Listener listener : _listeners) {
listener.retryUnhealthyServer(instanceId, this);
}
// Update the retry info and add it back to the delay queue
retryInfo._retryDelayNs = (long) (retryInfo._retryDelayNs * _retryDelayFactor);
retryInfo._retryTimeNs = System.nanoTime() + retryInfo._retryDelayNs;
retryInfo._numRetires++;
_retryInfoDelayQueue.offer(retryInfo);
} catch (Exception e) {
if (_running) {
LOGGER.error("Caught exception in the retry thread, continuing with errors", e);
}
}
}
});
_retryThread.setName("failure-detector-retry");
_retryThread.setDaemon(true);
_retryThread.start();
}

@Override
public void markServerHealthy(String instanceId) {
_unhealthyServerRetryInfoMap.computeIfPresent(instanceId, (id, retryInfo) -> {
LOGGER.info("Mark server: {} as healthy", instanceId);
_brokerMetrics.setValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS, _unhealthyServerRetryInfoMap.size() - 1);
for (Listener listener : _listeners) {
listener.notifyHealthyServer(instanceId, this);
}
return null;
});
}

@Override
public void markServerUnhealthy(String instanceId) {
_unhealthyServerRetryInfoMap.computeIfAbsent(instanceId, id -> {
LOGGER.warn("Mark server: {} as unhealthy", instanceId);
_brokerMetrics.setValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS, _unhealthyServerRetryInfoMap.size() + 1);
for (Listener listener : _listeners) {
listener.notifyUnhealthyServer(instanceId, this);
}
RetryInfo retryInfo = new RetryInfo(id);
_retryInfoDelayQueue.offer(retryInfo);
return retryInfo;
});
}

@Override
public Set<String> getUnhealthyServers() {
return _unhealthyServerRetryInfoMap.keySet();
}

@Override
public void stop() {
LOGGER.info("Stopping {}", _name);
_running = false;

try {
_retryThread.interrupt();
_retryThread.join();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while waiting for retry thread to finish", e);
}
}

/**
* Encapsulates the retry related information.
*/
protected class RetryInfo implements Delayed {
final String _instanceId;

long _retryTimeNs;
long _retryDelayNs;
int _numRetires;

RetryInfo(String instanceId) {
_instanceId = instanceId;
_retryTimeNs = System.nanoTime() + _retryInitialDelayNs;
_retryDelayNs = _retryInitialDelayNs;
_numRetires = 0;
}

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(_retryTimeNs - System.nanoTime(), TimeUnit.NANOSECONDS);
}

@Override
public int compareTo(Delayed o) {
RetryInfo that = (RetryInfo) o;
return Long.compare(_retryTimeNs, that._retryTimeNs);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.broker.failuredetector;

import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.core.transport.QueryResponse;
import org.apache.pinot.core.transport.ServerRoutingInstance;


/**
* The {@code ConnectionFailureDetector} marks failed server (connection failure) from query response as unhealthy, and
* retries the unhealthy servers with exponential increasing delays.
*/
@ThreadSafe
public class ConnectionFailureDetector extends BaseExponentialBackoffRetryFailureDetector {

@Override
public void notifyQuerySubmitted(QueryResponse queryResponse) {
}

@Override
public void notifyQueryFinished(QueryResponse queryResponse) {
ServerRoutingInstance failedServer = queryResponse.getFailedServer();
if (failedServer != null) {
markServerUnhealthy(failedServer.getInstanceId());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.broker.failuredetector;

import java.util.Set;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.core.transport.QueryResponse;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
import org.apache.pinot.spi.env.PinotConfiguration;


/**
* The {@code FailureDetector} detects unhealthy servers based on the query responses. When it detects an unhealthy
* server, it will notify the listener via a callback, and schedule a delay to retry the unhealthy server later via
* another callback.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
@ThreadSafe
public interface FailureDetector {

/**
* Listener for the failure detector.
*/
interface Listener {

/**
* Notifies the listener of an unhealthy server.
*/
void notifyUnhealthyServer(String instanceId, FailureDetector failureDetector);

/**
* Notifies the listener to retry a previous unhealthy server.
*/
void retryUnhealthyServer(String instanceId, FailureDetector failureDetector);

/**
* Notifies the listener of a previous unhealthy server turning healthy.
*/
void notifyHealthyServer(String instanceId, FailureDetector failureDetector);
}

/**
* Initializes the failure detector.
*/
void init(PinotConfiguration config, BrokerMetrics brokerMetrics);

/**
* Registers a listener to the failure detector.
*/
void register(Listener listener);

/**
* Starts the failure detector. Listeners should be registered before starting the failure detector.
*/
void start();

/**
* Notifies the failure detector that a query is submitted.
*/
void notifyQuerySubmitted(QueryResponse queryResponse);

/**
* Notifies the failure detector that a query is finished (COMPLETED, FAILED or TIMED_OUT).
*/
void notifyQueryFinished(QueryResponse queryResponse);

/**
* Marks a server as healthy.
*/
void markServerHealthy(String instanceId);

/**
* Marks a server as unhealthy.
*/
void markServerUnhealthy(String instanceId);

/**
* Returns all the unhealthy servers.
*/
Set<String> getUnhealthyServers();

/**
* Stops the failure detector.
*/
void stop();
}
Loading

0 comments on commit ce0a08a

Please sign in to comment.