diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java index 7f90f5692ae..67f6294c875 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java @@ -137,7 +137,6 @@ private void sendRequest(long requestId, TableType tableType, BrokerRequest brok public static class PinotServerStreamingQueryClient { private final Map _grpcQueryClientMap = new ConcurrentHashMap<>(); - private final Map _instanceIdToHostnamePortMap = new ConcurrentHashMap<>(); private final GrpcConfig _config; public PinotServerStreamingQueryClient(GrpcConfig config) { @@ -151,7 +150,6 @@ public Iterator submit(ServerInstance serverInstance, Ser private ServerGrpcQueryClient getOrCreateGrpcQueryClient(ServerInstance serverInstance) { String hostnamePort = String.format("%s_%d", serverInstance.getHostname(), serverInstance.getGrpcPort()); - _instanceIdToHostnamePortMap.put(serverInstance.getInstanceId(), hostnamePort); return _grpcQueryClientMap.computeIfAbsent(hostnamePort, k -> new ServerGrpcQueryClient(serverInstance.getHostname(), serverInstance.getGrpcPort(), _config)); } @@ -174,15 +172,15 @@ private FailureDetector.ServerState retryUnhealthyServer(String instanceId) { return FailureDetector.ServerState.UNHEALTHY; } - String hostnamePort = _streamingQueryClient._instanceIdToHostnamePortMap.get(instanceId); + String hostnamePort = String.format("%s_%d", serverInstance.getHostname(), serverInstance.getGrpcPort()); + ServerGrpcQueryClient client = _streamingQueryClient._grpcQueryClientMap.get(hostnamePort); + // Could occur if the cluster is only serving multi-stage queries - if (hostnamePort == null) { + if (client == null) { LOGGER.debug("No GrpcQueryClient found for server with instanceId: {}", instanceId); return FailureDetector.ServerState.UNKNOWN; } - ServerGrpcQueryClient client = _streamingQueryClient._grpcQueryClientMap.get(hostnamePort); - ConnectivityState connectivityState = client.getChannel().getState(true); if (connectivityState == ConnectivityState.READY) { LOGGER.info("Successfully connected to server: {}", instanceId); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 12fc754ade7..771e9783415 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -559,6 +559,6 @@ public FailureDetector.ServerState retryUnhealthyServer(String instanceId) { return FailureDetector.ServerState.UNHEALTHY; } - return _queryDispatcher.checkConnectivityToInstance(instanceId); + return _queryDispatcher.checkConnectivityToInstance(serverInstance); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index 9eb0297c7e7..38d617a255b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -55,6 +55,7 @@ import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.util.DataBlockExtractUtils; import org.apache.pinot.core.util.trace.TracedThreadFactory; import org.apache.pinot.query.mailbox.MailboxService; @@ -104,7 +105,6 @@ public class QueryDispatcher { private final MailboxService _mailboxService; private final ExecutorService _executorService; private final Map _dispatchClientMap = new ConcurrentHashMap<>(); - private final Map _instanceIdToHostnamePortMap = new ConcurrentHashMap<>(); private final Map _timeSeriesDispatchClientMap = new ConcurrentHashMap<>(); @Nullable private final TlsConfig _tlsConfig; @@ -217,22 +217,25 @@ void submit( } } - public FailureDetector.ServerState checkConnectivityToInstance(String instanceId) { - String hostnamePort = _instanceIdToHostnamePortMap.get(instanceId); + public FailureDetector.ServerState checkConnectivityToInstance(ServerInstance serverInstance) { + String hostname = serverInstance.getHostname(); + int port = serverInstance.getQueryServicePort(); + String hostnamePort = String.format("%s_%d", hostname, port); + DispatchClient client = _dispatchClientMap.get(hostnamePort); // Could occur if the cluster is only serving single-stage queries - if (hostnamePort == null) { - LOGGER.debug("No DispatchClient found for server with instanceId: {}", instanceId); + if (client == null) { + LOGGER.debug("No DispatchClient found for server with instanceId: {}", serverInstance.getInstanceId()); return FailureDetector.ServerState.UNKNOWN; } - DispatchClient client = _dispatchClientMap.get(hostnamePort); ConnectivityState connectivityState = client.getChannel().getState(true); if (connectivityState == ConnectivityState.READY) { - LOGGER.info("Successfully connected to server: {}", instanceId); + LOGGER.info("Successfully connected to server: {}", serverInstance.getInstanceId()); return FailureDetector.ServerState.HEALTHY; } else { - LOGGER.info("Still can't connect to server: {}, current state: {}", instanceId, connectivityState); + LOGGER.info("Still can't connect to server: {}, current state: {}", serverInstance.getInstanceId(), + connectivityState); return FailureDetector.ServerState.UNHEALTHY; } } @@ -444,7 +447,6 @@ private DispatchClient getOrCreateDispatchClient(QueryServerInstance queryServer String hostname = queryServerInstance.getHostname(); int port = queryServerInstance.getQueryServicePort(); String hostnamePort = String.format("%s_%d", hostname, port); - _instanceIdToHostnamePortMap.put(queryServerInstance.getInstanceId(), hostnamePort); return _dispatchClientMap.computeIfAbsent(hostnamePort, k -> new DispatchClient(hostname, port, _tlsConfig)); } @@ -547,7 +549,6 @@ public void shutdown() { dispatchClient.getChannel().shutdown(); } _dispatchClientMap.clear(); - _instanceIdToHostnamePortMap.clear(); _mailboxService.shutdown(); _executorService.shutdown(); }