Skip to content

Commit

Permalink
[multistage] make leaf stage run query optimizer (#9439)
Browse files Browse the repository at this point in the history
* add table info to multistage leave engine
* fix helix manager not started issue by not acquiring property store until start.

Co-authored-by: Rong Rong <[email protected]>
  • Loading branch information
walterddr and Rong Rong authored Sep 21, 2022
1 parent 7be06af commit 0d14363
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ static Object toRexValue(FieldSpec.DataType dataType, Comparable value) {
return value == null ? 0L : ((BigDecimal) value).longValue();
case FLOAT:
return value == null ? 0f : ((BigDecimal) value).floatValue();
case BIG_DECIMAL:
case DOUBLE:
return value == null ? 0d : ((BigDecimal) value).doubleValue();
case STRING:
Expand All @@ -127,6 +128,7 @@ static FieldSpec.DataType toDataType(RelDataType type) {
return FieldSpec.DataType.LONG;
case FLOAT:
return FieldSpec.DataType.FLOAT;
case DECIMAL:
case DOUBLE:
return FieldSpec.DataType.DOUBLE;
case CHAR:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,23 @@ public class QueryEnvironmentTestUtils {
ImmutableMap.of("a", Lists.newArrayList("a3"), "c", Lists.newArrayList("c2", "c3"),
"d_R", Lists.newArrayList("d2"), "d_O", Lists.newArrayList("d3"));

public static final Map<String, String> TABLE_NAME_MAP;
public static final Map<String, Schema> SCHEMA_NAME_MAP;

static {
SCHEMA_BUILDER = new Schema.SchemaBuilder().addSingleValueDimension("col1", FieldSpec.DataType.STRING, "")
SCHEMA_BUILDER = new Schema.SchemaBuilder()
.addSingleValueDimension("col1", FieldSpec.DataType.STRING, "")
.addSingleValueDimension("col2", FieldSpec.DataType.STRING, "")
.addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS")
.addMetric("col3", FieldSpec.DataType.INT, 0)
.setSchemaName("defaultSchemaName");
SCHEMA_NAME_MAP = ImmutableMap.of(
"a", SCHEMA_BUILDER.setSchemaName("a").build(),
"b", SCHEMA_BUILDER.setSchemaName("b").build(),
"c", SCHEMA_BUILDER.setSchemaName("c").build(),
"d", SCHEMA_BUILDER.setSchemaName("d").build());
TABLE_NAME_MAP = ImmutableMap.of("a_REALTIME", "a", "b_REALTIME", "b", "c_OFFLINE", "c",
"d_OFFLINE", "d", "d_REALTIME", "d");
}

private QueryEnvironmentTestUtils() {
Expand All @@ -76,12 +87,11 @@ private QueryEnvironmentTestUtils() {

public static TableCache mockTableCache() {
TableCache mock = mock(TableCache.class);
when(mock.getTableNameMap()).thenReturn(ImmutableMap.of("a_REALTIME", "a", "b_REALTIME", "b", "c_OFFLINE", "c",
"d_OFFLINE", "d", "d_REALTIME", "d"));
when(mock.getSchema("a")).thenReturn(SCHEMA_BUILDER.setSchemaName("a").build());
when(mock.getSchema("b")).thenReturn(SCHEMA_BUILDER.setSchemaName("b").build());
when(mock.getSchema("c")).thenReturn(SCHEMA_BUILDER.setSchemaName("c").build());
when(mock.getSchema("d")).thenReturn(SCHEMA_BUILDER.setSchemaName("d").build());
when(mock.getTableNameMap()).thenReturn(TABLE_NAME_MAP);
when(mock.getSchema(anyString())).thenAnswer(invocationOnMock -> {
String schemaName = invocationOnMock.getArgument(0);
return SCHEMA_NAME_MAP.get(schemaName);
});
return mock;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.utils.DataSchema;
Expand Down Expand Up @@ -62,6 +65,8 @@ public class QueryRunner {
// This is a temporary before merging the 2 type of executor.
private ServerQueryExecutorV1Impl _serverExecutor;
private WorkerQueryExecutor _workerExecutor;
private HelixManager _helixManager;
private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
private MailboxService<Mailbox.MailboxContent> _mailboxService;
private String _hostname;
private int _port;
Expand All @@ -70,11 +75,13 @@ public class QueryRunner {
* Initializes the query executor.
* <p>Should be called only once and before calling any other method.
*/
public void init(PinotConfiguration config, InstanceDataManager instanceDataManager, ServerMetrics serverMetrics) {
public void init(PinotConfiguration config, InstanceDataManager instanceDataManager,
HelixManager helixManager, ServerMetrics serverMetrics) {
String instanceName = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME);
_hostname = instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? instanceName.substring(
CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName;
_port = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
_helixManager = helixManager;
try {
_mailboxService = new GrpcMailboxService(_hostname, _port, config);
_serverExecutor = new ServerQueryExecutorV1Impl();
Expand All @@ -87,6 +94,7 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana
}

public void start() {
_helixPropertyStore = _helixManager.getHelixPropertyStore();
_mailboxService.start();
_serverExecutor.start();
_workerExecutor.start();
Expand All @@ -105,7 +113,8 @@ public void processQuery(DistributedStagePlan distributedStagePlan, ExecutorServ
// and package it here for return. But we should really use a MailboxSendOperator directly put into the
// server executor.
List<ServerQueryRequest> serverQueryRequests =
ServerRequestUtils.constructServerQueryRequest(distributedStagePlan, requestMetadataMap);
ServerRequestUtils.constructServerQueryRequest(distributedStagePlan, requestMetadataMap,
_helixPropertyStore);

// send the data table via mailbox in one-off fashion (e.g. no block-level split, one data table/partition key)
List<BaseDataBlock> serverQueryResults = new ArrayList<>(serverQueryRequests.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
*/
package org.apache.pinot.query.runtime.utils;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.DataSource;
Expand All @@ -32,6 +36,7 @@
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.request.QuerySource;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.query.optimizer.QueryOptimizer;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.parser.CalciteRexExpressionParser;
Expand All @@ -44,7 +49,9 @@
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.planner.stage.TableScanNode;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.FilterKind;
Expand All @@ -65,25 +72,40 @@ public class ServerRequestUtils {
ImmutableList.of(PredicateComparisonRewriter.class.getName());
private static final List<QueryRewriter> QUERY_REWRITERS = new ArrayList<>(
QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES));
private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer();

private ServerRequestUtils() {
// do not instantiate.
}

// TODO: This is a hack, make an actual ServerQueryRequest converter.
public static List<ServerQueryRequest> constructServerQueryRequest(DistributedStagePlan distributedStagePlan,
Map<String, String> requestMetadataMap) {
Map<String, String> requestMetadataMap, ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
StageMetadata stageMetadata = distributedStagePlan.getMetadataMap().get(distributedStagePlan.getStageId());
Preconditions.checkState(stageMetadata.getScannedTables().size() == 1,
"Server request for V2 engine should only have 1 scan table per request.");
String rawTableName = stageMetadata.getScannedTables().get(0);
Map<String, List<String>> tableToSegmentListMap = stageMetadata.getServerInstanceToSegmentsMap()
.get(distributedStagePlan.getServerInstance());
List<ServerQueryRequest> requests = new ArrayList<>();
for (Map.Entry<String, List<String>> tableEntry : tableToSegmentListMap.entrySet()) {
String tableType = tableEntry.getKey();
// ZkHelixPropertyStore extends from ZkCacheBaseDataAccessor so it should not cause too much out-of-the-box
// network traffic. but there's chance to improve this:
// TODO: use TableDataManager: it is already getting tableConfig and Schema when processing segments.
if (TableType.OFFLINE.name().equals(tableType)) {
requests.add(constructServerQueryRequest(distributedStagePlan, requestMetadataMap,
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
requests.add(constructServerQueryRequest(distributedStagePlan, requestMetadataMap, tableConfig, schema,
stageMetadata.getTimeBoundaryInfo(), TableType.OFFLINE, tableEntry.getValue()));
} else if (TableType.REALTIME.name().equals(tableType)) {
requests.add(constructServerQueryRequest(distributedStagePlan, requestMetadataMap,
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
requests.add(constructServerQueryRequest(distributedStagePlan, requestMetadataMap, tableConfig, schema,
stageMetadata.getTimeBoundaryInfo(), TableType.REALTIME, tableEntry.getValue()));
} else {
throw new IllegalArgumentException("Unsupported table type key: " + tableType);
Expand All @@ -93,23 +115,24 @@ public static List<ServerQueryRequest> constructServerQueryRequest(DistributedSt
}

public static ServerQueryRequest constructServerQueryRequest(DistributedStagePlan distributedStagePlan,
Map<String, String> requestMetadataMap, TimeBoundaryInfo timeBoundaryInfo, TableType tableType,
List<String> segmentList) {
Map<String, String> requestMetadataMap, TableConfig tableConfig, Schema schema,
TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String> segmentList) {
InstanceRequest instanceRequest = new InstanceRequest();
instanceRequest.setRequestId(Long.parseLong(requestMetadataMap.get("REQUEST_ID")));
instanceRequest.setBrokerId("unknown");
instanceRequest.setEnableTrace(false);
instanceRequest.setSearchSegments(segmentList);
instanceRequest.setQuery(constructBrokerRequest(distributedStagePlan, tableType, timeBoundaryInfo));
instanceRequest.setQuery(constructBrokerRequest(distributedStagePlan, tableType, tableConfig, schema,
timeBoundaryInfo));
return new ServerQueryRequest(instanceRequest, new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
System.currentTimeMillis());
}

// TODO: this is a hack, create a broker request object should not be needed because we rewrite the entire
// query into stages already.
public static BrokerRequest constructBrokerRequest(DistributedStagePlan distributedStagePlan, TableType tableType,
TimeBoundaryInfo timeBoundaryInfo) {
PinotQuery pinotQuery = constructPinotQuery(distributedStagePlan, tableType, timeBoundaryInfo);
TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo) {
PinotQuery pinotQuery = constructPinotQuery(distributedStagePlan, tableType, tableConfig, schema, timeBoundaryInfo);
BrokerRequest brokerRequest = new BrokerRequest();
brokerRequest.setPinotQuery(pinotQuery);
// Set table name in broker request because it is used for access control, query routing etc.
Expand All @@ -123,7 +146,7 @@ public static BrokerRequest constructBrokerRequest(DistributedStagePlan distribu
}

public static PinotQuery constructPinotQuery(DistributedStagePlan distributedStagePlan, TableType tableType,
TimeBoundaryInfo timeBoundaryInfo) {
TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo) {
PinotQuery pinotQuery = new PinotQuery();
pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
pinotQuery.setExplain(false);
Expand All @@ -134,6 +157,7 @@ public static PinotQuery constructPinotQuery(DistributedStagePlan distributedSta
for (QueryRewriter queryRewriter : QUERY_REWRITERS) {
pinotQuery = queryRewriter.rewrite(pinotQuery);
}
QUERY_OPTIMIZER.optimize(pinotQuery, tableConfig, schema);
return pinotQuery;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.query.runtime.QueryRunner;
Expand All @@ -50,8 +54,11 @@
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.matches;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -75,6 +82,8 @@ public class QueryServerEnclosure {
private static final int DEFAULT_EXECUTOR_THREAD_NUM = 5;
private static final String[] STRING_FIELD_LIST = new String[]{"foo", "bar", "alice", "bob", "charlie"};
private static final int[] INT_FIELD_LIST = new int[]{1, 42};
private static final String TABLE_CONFIGS_PREFIX = "/CONFIGS/TABLE/";
private static final String SCHEMAS_PREFIX = "/SCHEMAS/";

private final ExecutorService _testExecutor;
private final int _queryRunnerPort;
Expand All @@ -84,6 +93,7 @@ public class QueryServerEnclosure {
private final InstanceDataManager _instanceDataManager;
private final Map<String, TableDataManager> _tableDataManagers = new HashMap<>();
private final Map<String, File> _indexDirs;
private final HelixManager _helixManager;

private QueryRunner _queryRunner;

Expand All @@ -101,6 +111,7 @@ public QueryServerEnclosure(Map<String, File> indexDirs, Map<String, List<String
_segmentMap.put(tableName, segmentList);
}
_instanceDataManager = mockInstanceDataManager();
_helixManager = mockHelixManager();
_queryRunnerPort = QueryEnvironmentTestUtils.getAvailablePort();
_runnerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, _queryRunnerPort);
_runnerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME,
Expand All @@ -113,6 +124,25 @@ public QueryServerEnclosure(Map<String, File> indexDirs, Map<String, List<String
}
}

private HelixManager mockHelixManager() {
ZkHelixPropertyStore<ZNRecord> zkHelixPropertyStore = mock(ZkHelixPropertyStore.class);
when(zkHelixPropertyStore.get(anyString(), any(), anyInt())).thenAnswer(invocationOnMock -> {
String path = invocationOnMock.getArgument(0);
if (path.startsWith(TABLE_CONFIGS_PREFIX)) {
// TODO: add table config mock.
return null;
} else if (path.startsWith(SCHEMAS_PREFIX)) {
String tableName = TableNameBuilder.extractRawTableName(path.substring(SCHEMAS_PREFIX.length()));
return SchemaUtils.toZNRecord(QueryEnvironmentTestUtils.SCHEMA_NAME_MAP.get(tableName));
} else {
return null;
}
});
HelixManager helixManager = mock(HelixManager.class);
when(helixManager.getHelixPropertyStore()).thenReturn(zkHelixPropertyStore);
return helixManager;
}

private ServerMetrics mockServiceMetrics() {
return mock(ServerMetrics.class);
}
Expand Down Expand Up @@ -186,7 +216,7 @@ public void start()
throws Exception {
PinotConfiguration configuration = new PinotConfiguration(_runnerConfig);
_queryRunner = new QueryRunner();
_queryRunner.init(configuration, _instanceDataManager, mockServiceMetrics());
_queryRunner.init(configuration, _instanceDataManager, _helixManager, mockServiceMetrics());
_queryRunner.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private Object[][] provideTestSql() {
new Object[]{"SELECT * FROM a ORDER BY col1 LIMIT 20"},

// No match filter
new Object[]{"SELECT * FROM b WHERE col3 < 0"},
new Object[]{"SELECT * FROM b WHERE col3 < 0.5"},

// Hybrid table
new Object[]{"SELECT * FROM d"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public ServerInstance(ServerConf serverConf, HelixManager helixManager, AccessCo

if (serverConf.isMultiStageServerEnabled()) {
LOGGER.info("Initializing Multi-stage query engine");
_workerQueryServer = new WorkerQueryServer(serverConf.getPinotConfig(), _instanceDataManager, _serverMetrics);
_workerQueryServer = new WorkerQueryServer(serverConf.getPinotConfig(), _instanceDataManager, helixManager,
_serverMetrics);
} else {
_workerQueryServer = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.core.data.manager.InstanceDataManager;
Expand All @@ -37,21 +38,23 @@ public class WorkerQueryServer {
private final ExecutorService _executor;
private final int _queryServicePort;
private final PinotConfiguration _configuration;
private final HelixManager _helixManager;

private QueryServer _queryWorkerService;
private QueryRunner _queryRunner;
private InstanceDataManager _instanceDataManager;
private ServerMetrics _serverMetrics;

public WorkerQueryServer(PinotConfiguration configuration, InstanceDataManager instanceDataManager,
ServerMetrics serverMetrics) {
HelixManager helixManager, ServerMetrics serverMetrics) {
_configuration = toWorkerQueryConfig(configuration);
_helixManager = helixManager;
_instanceDataManager = instanceDataManager;
_serverMetrics = serverMetrics;
_queryServicePort =
_configuration.getProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, QueryConfig.DEFAULT_QUERY_SERVER_PORT);
_queryRunner = new QueryRunner();
_queryRunner.init(_configuration, _instanceDataManager, _serverMetrics);
_queryRunner.init(_configuration, _instanceDataManager, _helixManager, _serverMetrics);
_queryWorkerService = new QueryServer(_queryServicePort, _queryRunner);
_executor = Executors.newFixedThreadPool(DEFAULT_EXECUTOR_THREAD_NUM,
new NamedThreadFactory("worker_query_server_enclosure_on_" + _queryServicePort + "_port"));
Expand Down

0 comments on commit 0d14363

Please sign in to comment.