Skip to content

Commit

Permalink
Adding NoopPinotMetricFactory and corresponding changes
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 committed Mar 1, 2022
1 parent 1c4512e commit 775ac8a
Show file tree
Hide file tree
Showing 3 changed files with 317 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.connector.presto;

import com.google.common.collect.ImmutableMap;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
Expand All @@ -35,15 +36,19 @@
import org.apache.pinot.common.metrics.PinotMetricUtils;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.connector.presto.plugin.metrics.NoopPinotMetricFactory;
import org.apache.pinot.core.transport.AsyncQueryResponse;
import org.apache.pinot.core.transport.QueryRouter;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerResponse;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;

import static org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_METRICS_FACTORY_CLASS_NAME;


public class PinotScatterGatherQueryClient {
private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler();
Expand Down Expand Up @@ -72,8 +77,7 @@ public boolean isRetriable() {
}
}

public static class PinotException
extends RuntimeException {
public static class PinotException extends RuntimeException {
private final ErrorCode _errorCode;

public PinotException(ErrorCode errorCode, String message, Throwable t) {
Expand Down Expand Up @@ -188,6 +192,8 @@ public String getSslProvider() {

public PinotScatterGatherQueryClient(Config pinotConfig) {
_prestoHostId = getDefaultPrestoId();
PinotMetricUtils.init(new PinotConfiguration(
ImmutableMap.of(CONFIG_OF_METRICS_FACTORY_CLASS_NAME, NoopPinotMetricFactory.class.getName())));
_brokerMetrics = new BrokerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
_brokerMetrics.initializeGlobalMeters();
TlsConfig tlsConfig = getTlsConfig(pinotConfig);
Expand Down Expand Up @@ -240,13 +246,8 @@ private String getDefaultPrestoId() {
return defaultBrokerId;
}

public Map<ServerInstance, DataTable> queryPinotServerForDataTable(
String pql,
String serverHost,
List<String> segments,
long connectionTimeoutInMillis,
boolean ignoreEmptyResponses,
int pinotRetryCount) {
public Map<ServerInstance, DataTable> queryPinotServerForDataTable(String pql, String serverHost,
List<String> segments, long connectionTimeoutInMillis, boolean ignoreEmptyResponses, int pinotRetryCount) {
BrokerRequest brokerRequest;
try {
brokerRequest = REQUEST_COMPILER.compileToBrokerRequest(pql);
Expand All @@ -260,7 +261,7 @@ public Map<ServerInstance, DataTable> queryPinotServerForDataTable(
new ArrayList<>(segments));

// Unfortunately the retries will all hit the same server because the routing decision has already been made by
// the pinot broker
// the pinot broker
Map<ServerInstance, DataTable> serverResponseMap = doWithRetries(pinotRetryCount, (requestId) -> {
String rawTableName = TableNameBuilder.extractRawTableName(brokerRequest.getQuerySource().getTableName());
if (!_concurrentQueriesCountMap.containsKey(serverHost)) {
Expand All @@ -276,17 +277,17 @@ public Map<ServerInstance, DataTable> queryPinotServerForDataTable(
QueryRouter nextAvailableQueryRouter = getNextAvailableQueryRouter();
if (TableNameBuilder.getTableTypeFromTableName(brokerRequest.getQuerySource().getTableName())
== TableType.REALTIME) {
asyncQueryResponse = nextAvailableQueryRouter.submitQuery(requestId, rawTableName, null, null, brokerRequest,
routingTable, connectionTimeoutInMillis);
asyncQueryResponse =
nextAvailableQueryRouter.submitQuery(requestId, rawTableName, null, null, brokerRequest, routingTable,
connectionTimeoutInMillis);
} else {
asyncQueryResponse = nextAvailableQueryRouter
.submitQuery(requestId, rawTableName, brokerRequest, routingTable, null, null, connectionTimeoutInMillis);
asyncQueryResponse =
nextAvailableQueryRouter.submitQuery(requestId, rawTableName, brokerRequest, routingTable, null, null,
connectionTimeoutInMillis);
}
Map<ServerInstance, DataTable> serverInstanceDataTableMap = gatherServerResponses(
ignoreEmptyResponses,
routingTable,
asyncQueryResponse,
brokerRequest.getQuerySource().getTableName());
Map<ServerInstance, DataTable> serverInstanceDataTableMap =
gatherServerResponses(ignoreEmptyResponses, routingTable, asyncQueryResponse,
brokerRequest.getQuerySource().getTableName());
_queryRouters.offer(nextAvailableQueryRouter);
_concurrentQueriesCountMap.get(serverHost).decrementAndGet();
return serverInstanceDataTableMap;
Expand Down Expand Up @@ -320,15 +321,15 @@ private Map<ServerInstance, DataTable> gatherServerResponses(boolean ignoreEmpty
: entry.getValue().toString();
routingTableForLogging.put(entry.getKey().toString(), valueToPrint);
});
throw new PinotException(ErrorCode.PINOT_INSUFFICIENT_SERVER_RESPONSE, String
.format("%d of %d servers responded with routing table servers: %s, query stats: %s",
throw new PinotException(ErrorCode.PINOT_INSUFFICIENT_SERVER_RESPONSE,
String.format("%d of %d servers responded with routing table servers: %s, query stats: %s",
queryResponses.size(), routingTable.size(), routingTableForLogging, asyncQueryResponse.getStats()));
}
}
Map<ServerInstance, DataTable> serverResponseMap = new HashMap<>();
queryResponses.entrySet().forEach(entry -> serverResponseMap.put(
new ServerInstance(new InstanceConfig(
String.format("Server_%s_%d", entry.getKey().getHostname(), entry.getKey().getPort()))),
queryResponses.entrySet().forEach(entry -> serverResponseMap.put(new ServerInstance(
new InstanceConfig(String.format("Server_%s_%d", entry.getKey().getHostname(),
entry.getKey().getPort()))),
entry.getValue().getDataTable()));
return serverResponseMap;
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pinot.connector.presto.grpc;

import java.util.HashMap;
Expand Down
Loading

0 comments on commit 775ac8a

Please sign in to comment.