Skip to content

Commit

Permalink
Rewrite FailureDetector interface and implementations to also work wi…
Browse files Browse the repository at this point in the history
…th the multi-stage engine (#15005)
  • Loading branch information
yashmayya authored Feb 14, 2025
1 parent 82bdda5 commit f65f845
Show file tree
Hide file tree
Showing 22 changed files with 458 additions and 301 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.cursors.AbstractResponseStore;
import org.apache.pinot.common.failuredetector.FailureDetector;
import org.apache.pinot.common.failuredetector.FailureDetectorFactory;
import org.apache.pinot.common.function.FunctionRegistry;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.BrokerGauge;
Expand Down Expand Up @@ -144,6 +146,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
protected HelixExternalViewBasedQueryQuotaManager _queryQuotaManager;
protected MultiStageQueryThrottler _multiStageQueryThrottler;
protected AbstractResponseStore _responseStore;
protected FailureDetector _failureDetector;

@Override
public void init(PinotConfiguration brokerConf)
Expand Down Expand Up @@ -319,14 +322,22 @@ public void start()
LOGGER.info("Initializing Broker Event Listener Factory");
BrokerQueryEventListenerFactory.init(_brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX));

// Initialize the failure detector that removes servers from the broker routing table if they are not healthy
_failureDetector = FailureDetectorFactory.getFailureDetector(_brokerConf, _brokerMetrics);
_failureDetector.registerHealthyServerNotifier(
instanceId -> _routingManager.includeServerToRouting(instanceId));
_failureDetector.registerUnhealthyServerNotifier(
instanceId -> _routingManager.excludeServerFromRouting(instanceId));
_failureDetector.start();

// Create Broker request handler.
String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
String brokerRequestHandlerType =
_brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE);
BaseSingleStageBrokerRequestHandler singleStageBrokerRequestHandler;
if (brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
singleStageBrokerRequestHandler = new GrpcBrokerRequestHandler(_brokerConf, brokerId, _routingManager,
_accessControlFactory, _queryQuotaManager, tableCache);
_accessControlFactory, _queryQuotaManager, tableCache, _failureDetector);
} else {
// Default request handler type, i.e. netty
NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX);
Expand All @@ -337,7 +348,8 @@ public void start()
}
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
_queryQuotaManager, tableCache, nettyDefaults, tlsDefaults, _serverRoutingStatsManager);
_queryQuotaManager, tableCache, nettyDefaults, tlsDefaults, _serverRoutingStatsManager,
_failureDetector);
}
MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
QueryDispatcher queryDispatcher = null;
Expand All @@ -350,7 +362,7 @@ public void start()
queryDispatcher = createQueryDispatcher(_brokerConf);
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
_queryQuotaManager, tableCache, _multiStageQueryThrottler);
_queryQuotaManager, tableCache, _multiStageQueryThrottler, _failureDetector);
}
TimeSeriesRequestHandler timeSeriesRequestHandler = null;
if (StringUtils.isNotBlank(_brokerConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) {
Expand Down Expand Up @@ -613,6 +625,8 @@ public void stop() {
LOGGER.info("Stopping cluster change mediator");
_clusterChangeMediator.stop();

_failureDetector.stop();

// Delay shutdown of request handler so that the pending queries can be finished. The participant Helix manager has
// been disconnected, so instance should disappear from ExternalView soon and stop getting new queries.
long delayShutdownTimeMs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.broker.requesthandler;

import io.grpc.ConnectivityState;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand All @@ -31,6 +32,7 @@
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.failuredetector.FailureDetector;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
Expand All @@ -43,22 +45,30 @@
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.trace.RequestContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* The <code>GrpcBrokerRequestHandler</code> class communicates query request via GRPC.
*/
@ThreadSafe
public class GrpcBrokerRequestHandler extends BaseSingleStageBrokerRequestHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(GrpcBrokerRequestHandler.class);

private final StreamingReduceService _streamingReduceService;
private final PinotStreamingQueryClient _streamingQueryClient;
private final FailureDetector _failureDetector;

// TODO: Support TLS
public GrpcBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache) {
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
FailureDetector failureDetector) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache);
_streamingReduceService = new StreamingReduceService(config);
_streamingQueryClient = new PinotStreamingQueryClient(GrpcConfig.buildGrpcQueryConfig(config));
_failureDetector = failureDetector;
_failureDetector.registerUnhealthyServerRetrier(this::retryUnhealthyServer);
}

@Override
Expand All @@ -81,7 +91,6 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
@Nullable Map<ServerInstance, ServerRouteInfo> realtimeRoutingTable, long timeoutMs,
ServerStats serverStats, RequestContext requestContext)
throws Exception {
// TODO: Support failure detection
// TODO: Add servers queried/responded stats
assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap = new HashMap<>();
Expand Down Expand Up @@ -112,33 +121,39 @@ private void sendRequest(long requestId, TableType tableType, BrokerRequest brok
ServerInstance serverInstance = routingEntry.getKey();
// TODO: support optional segments for GrpcQueryServer.
List<String> segments = routingEntry.getValue().getSegments();
String serverHost = serverInstance.getHostname();
int port = serverInstance.getGrpcPort();
// TODO: enable throttling on per host bases.
Iterator<Server.ServerResponse> streamingResponse = _streamingQueryClient.submit(serverHost, port,
new GrpcRequestBuilder().setRequestId(requestId).setBrokerId(_brokerId).setEnableTrace(trace)
.setEnableStreaming(true).setBrokerRequest(brokerRequest).setSegments(segments).build());
responseMap.put(serverInstance.toServerRoutingInstance(tableType, ServerInstance.RoutingType.GRPC),
streamingResponse);
try {
Iterator<Server.ServerResponse> streamingResponse = _streamingQueryClient.submit(serverInstance,
new GrpcRequestBuilder().setRequestId(requestId).setBrokerId(_brokerId).setEnableTrace(trace)
.setEnableStreaming(true).setBrokerRequest(brokerRequest).setSegments(segments).build());
responseMap.put(serverInstance.toServerRoutingInstance(tableType, ServerInstance.RoutingType.GRPC),
streamingResponse);
} catch (Exception e) {
LOGGER.warn("Failed to send request {} to server: {}", requestId, serverInstance.getInstanceId(), e);
_failureDetector.markServerUnhealthy(serverInstance.getInstanceId());
}
}
}

public static class PinotStreamingQueryClient {
private final Map<String, GrpcQueryClient> _grpcQueryClientMap = new ConcurrentHashMap<>();
private final Map<String, String> _instanceIdToHostnamePortMap = new ConcurrentHashMap<>();
private final GrpcConfig _config;

public PinotStreamingQueryClient(GrpcConfig config) {
_config = config;
}

public Iterator<Server.ServerResponse> submit(String host, int port, Server.ServerRequest serverRequest) {
GrpcQueryClient client = getOrCreateGrpcQueryClient(host, port);
public Iterator<Server.ServerResponse> submit(ServerInstance serverInstance, Server.ServerRequest serverRequest) {
GrpcQueryClient client = getOrCreateGrpcQueryClient(serverInstance);
return client.submit(serverRequest);
}

private GrpcQueryClient getOrCreateGrpcQueryClient(String host, int port) {
String key = String.format("%s_%d", host, port);
return _grpcQueryClientMap.computeIfAbsent(key, k -> new GrpcQueryClient(host, port, _config));
private GrpcQueryClient getOrCreateGrpcQueryClient(ServerInstance serverInstance) {
String hostnamePort = String.format("%s_%d", serverInstance.getHostname(), serverInstance.getGrpcPort());
_instanceIdToHostnamePortMap.put(serverInstance.getInstanceId(), hostnamePort);
return _grpcQueryClientMap.computeIfAbsent(hostnamePort,
k -> new GrpcQueryClient(serverInstance.getHostname(), serverInstance.getGrpcPort(), _config));
}

public void shutdown() {
Expand All @@ -147,4 +162,26 @@ public void shutdown() {
}
}
}

/**
* Check if a server that was previously detected as unhealthy is now healthy.
*/
private boolean retryUnhealthyServer(String instanceId) {
LOGGER.info("Checking gRPC connection to unhealthy server: {}", instanceId);
ServerInstance serverInstance = _routingManager.getEnabledServerInstanceMap().get(instanceId);
if (serverInstance == null) {
LOGGER.info("Failed to find enabled server: {} in routing manager, skipping the retry", instanceId);
return false;
}

String hostnamePort = _streamingQueryClient._instanceIdToHostnamePortMap.get(instanceId);
GrpcQueryClient client = _streamingQueryClient._grpcQueryClientMap.get(hostnamePort);

if (client == null) {
LOGGER.warn("No GrpcQueryClient found for server with instanceId: {}", instanceId);
return false;
}

return client.getChannel().getState(true) == ConnectivityState.READY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.exception.QueryInfoException;
import org.apache.pinot.common.failuredetector.FailureDetector;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerQueryPhase;
import org.apache.pinot.common.response.BrokerResponse;
Expand All @@ -67,6 +68,7 @@
import org.apache.pinot.common.utils.tls.TlsUtils;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.explain.AskingServerStageExplainer;
Expand All @@ -91,6 +93,10 @@
import org.slf4j.LoggerFactory;


/**
* This class serves as the broker entry-point for handling incoming multi-stage query requests and dispatching them
* to servers.
*/
public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MultiStageBrokerRequestHandler.class);

Expand All @@ -104,17 +110,20 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {

public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
MultiStageQueryThrottler queryThrottler) {
MultiStageQueryThrottler queryThrottler, FailureDetector failureDetector) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache);
String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
int port = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
_workerManager = new WorkerManager(hostname, port, _routingManager);
_workerManager = new WorkerManager(_brokerId, hostname, port, _routingManager);
TlsConfig tlsConfig = config.getProperty(
CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_TLS_ENABLED,
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_TLS_ENABLED) ? TlsUtils.extractTlsConfig(config,
CommonConstants.Broker.BROKER_TLS_PREFIX) : null;
_queryDispatcher = new QueryDispatcher(
new MailboxService(hostname, port, config, tlsConfig), tlsConfig, this.isQueryCancellationEnabled());

failureDetector.registerUnhealthyServerRetrier(this::retryUnhealthyServer);
_queryDispatcher =
new QueryDispatcher(new MailboxService(hostname, port, config, tlsConfig), tlsConfig, failureDetector,
this.isQueryCancellationEnabled());
LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port: {} with broker id: {}, timeout: {}ms, "
+ "query log max length: {}, query log max rate: {}, query cancellation enabled: {}", hostname, port,
_brokerId, _brokerTimeoutMs, _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(),
Expand Down Expand Up @@ -538,4 +547,18 @@ private static String toSizeLimitedString(Set<String> setOfStrings, int limit) {
return setOfStrings.stream().limit(limit)
.collect(Collectors.joining(", ", "[", setOfStrings.size() > limit ? "...]" : "]"));
}

/**
* Check if a server that was previously detected as unhealthy is now healthy.
*/
public boolean retryUnhealthyServer(String instanceId) {
LOGGER.info("Checking gRPC connection to unhealthy server: {}", instanceId);
ServerInstance serverInstance = _routingManager.getEnabledServerInstanceMap().get(instanceId);
if (serverInstance == null) {
LOGGER.info("Failed to find enabled server: {} in routing manager, skipping the retry", instanceId);
return false;
}

return _queryDispatcher.checkConnectivityToInstance(instanceId);
}
}
Loading

0 comments on commit f65f845

Please sign in to comment.