diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java index f60c4c22639e..a6fe9ee6f938 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java @@ -110,6 +110,7 @@ static Object toRexValue(FieldSpec.DataType dataType, Comparable value) { return value == null ? 0L : ((BigDecimal) value).longValue(); case FLOAT: return value == null ? 0f : ((BigDecimal) value).floatValue(); + case BIG_DECIMAL: case DOUBLE: return value == null ? 0d : ((BigDecimal) value).doubleValue(); case STRING: @@ -127,6 +128,7 @@ static FieldSpec.DataType toDataType(RelDataType type) { return FieldSpec.DataType.LONG; case FLOAT: return FieldSpec.DataType.FLOAT; + case DECIMAL: case DOUBLE: return FieldSpec.DataType.DOUBLE; case CHAR: diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java index 4ff42a370d4f..6e2776d6c72c 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java @@ -62,12 +62,23 @@ public class QueryEnvironmentTestUtils { ImmutableMap.of("a", Lists.newArrayList("a3"), "c", Lists.newArrayList("c2", "c3"), "d_R", Lists.newArrayList("d2"), "d_O", Lists.newArrayList("d3")); + public static final Map TABLE_NAME_MAP; + public static final Map SCHEMA_NAME_MAP; + static { - SCHEMA_BUILDER = new Schema.SchemaBuilder().addSingleValueDimension("col1", FieldSpec.DataType.STRING, "") + SCHEMA_BUILDER = new Schema.SchemaBuilder() + .addSingleValueDimension("col1", FieldSpec.DataType.STRING, "") .addSingleValueDimension("col2", FieldSpec.DataType.STRING, "") .addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS") .addMetric("col3", FieldSpec.DataType.INT, 0) .setSchemaName("defaultSchemaName"); + SCHEMA_NAME_MAP = ImmutableMap.of( + "a", SCHEMA_BUILDER.setSchemaName("a").build(), + "b", SCHEMA_BUILDER.setSchemaName("b").build(), + "c", SCHEMA_BUILDER.setSchemaName("c").build(), + "d", SCHEMA_BUILDER.setSchemaName("d").build()); + TABLE_NAME_MAP = ImmutableMap.of("a_REALTIME", "a", "b_REALTIME", "b", "c_OFFLINE", "c", + "d_OFFLINE", "d", "d_REALTIME", "d"); } private QueryEnvironmentTestUtils() { @@ -76,12 +87,11 @@ private QueryEnvironmentTestUtils() { public static TableCache mockTableCache() { TableCache mock = mock(TableCache.class); - when(mock.getTableNameMap()).thenReturn(ImmutableMap.of("a_REALTIME", "a", "b_REALTIME", "b", "c_OFFLINE", "c", - "d_OFFLINE", "d", "d_REALTIME", "d")); - when(mock.getSchema("a")).thenReturn(SCHEMA_BUILDER.setSchemaName("a").build()); - when(mock.getSchema("b")).thenReturn(SCHEMA_BUILDER.setSchemaName("b").build()); - when(mock.getSchema("c")).thenReturn(SCHEMA_BUILDER.setSchemaName("c").build()); - when(mock.getSchema("d")).thenReturn(SCHEMA_BUILDER.setSchemaName("d").build()); + when(mock.getTableNameMap()).thenReturn(TABLE_NAME_MAP); + when(mock.getSchema(anyString())).thenAnswer(invocationOnMock -> { + String schemaName = invocationOnMock.getArgument(0); + return SCHEMA_NAME_MAP.get(schemaName); + }); return mock; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 9419195a506f..6cca6fabb7fa 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -24,6 +24,9 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import javax.annotation.Nullable; +import org.apache.helix.HelixManager; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.proto.Mailbox; import org.apache.pinot.common.utils.DataSchema; @@ -62,6 +65,8 @@ public class QueryRunner { // This is a temporary before merging the 2 type of executor. private ServerQueryExecutorV1Impl _serverExecutor; private WorkerQueryExecutor _workerExecutor; + private HelixManager _helixManager; + private ZkHelixPropertyStore _helixPropertyStore; private MailboxService _mailboxService; private String _hostname; private int _port; @@ -70,11 +75,13 @@ public class QueryRunner { * Initializes the query executor. *

Should be called only once and before calling any other method. */ - public void init(PinotConfiguration config, InstanceDataManager instanceDataManager, ServerMetrics serverMetrics) { + public void init(PinotConfiguration config, InstanceDataManager instanceDataManager, + HelixManager helixManager, ServerMetrics serverMetrics) { String instanceName = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME); _hostname = instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? instanceName.substring( CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName; _port = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT); + _helixManager = helixManager; try { _mailboxService = new GrpcMailboxService(_hostname, _port, config); _serverExecutor = new ServerQueryExecutorV1Impl(); @@ -87,6 +94,7 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana } public void start() { + _helixPropertyStore = _helixManager.getHelixPropertyStore(); _mailboxService.start(); _serverExecutor.start(); _workerExecutor.start(); @@ -105,7 +113,8 @@ public void processQuery(DistributedStagePlan distributedStagePlan, ExecutorServ // and package it here for return. But we should really use a MailboxSendOperator directly put into the // server executor. List serverQueryRequests = - ServerRequestUtils.constructServerQueryRequest(distributedStagePlan, requestMetadataMap); + ServerRequestUtils.constructServerQueryRequest(distributedStagePlan, requestMetadataMap, + _helixPropertyStore); // send the data table via mailbox in one-off fashion (e.g. no block-level split, one data table/partition key) List serverQueryResults = new ArrayList<>(serverQueryRequests.size()); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java index dc53c1f0d0e1..36b445b31080 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java @@ -18,12 +18,16 @@ */ package org.apache.pinot.query.runtime.utils; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.DataSource; @@ -32,6 +36,7 @@ import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.common.request.QuerySource; import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.core.query.optimizer.QueryOptimizer; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.query.parser.CalciteRexExpressionParser; @@ -44,7 +49,9 @@ import org.apache.pinot.query.planner.stage.StageNode; import org.apache.pinot.query.planner.stage.TableScanNode; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; +import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.FilterKind; @@ -65,6 +72,7 @@ public class ServerRequestUtils { ImmutableList.of(PredicateComparisonRewriter.class.getName()); private static final List QUERY_REWRITERS = new ArrayList<>( QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES)); + private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer(); private ServerRequestUtils() { // do not instantiate. @@ -72,18 +80,32 @@ private ServerRequestUtils() { // TODO: This is a hack, make an actual ServerQueryRequest converter. public static List constructServerQueryRequest(DistributedStagePlan distributedStagePlan, - Map requestMetadataMap) { + Map requestMetadataMap, ZkHelixPropertyStore helixPropertyStore) { StageMetadata stageMetadata = distributedStagePlan.getMetadataMap().get(distributedStagePlan.getStageId()); + Preconditions.checkState(stageMetadata.getScannedTables().size() == 1, + "Server request for V2 engine should only have 1 scan table per request."); + String rawTableName = stageMetadata.getScannedTables().get(0); Map> tableToSegmentListMap = stageMetadata.getServerInstanceToSegmentsMap() .get(distributedStagePlan.getServerInstance()); List requests = new ArrayList<>(); for (Map.Entry> tableEntry : tableToSegmentListMap.entrySet()) { String tableType = tableEntry.getKey(); + // ZkHelixPropertyStore extends from ZkCacheBaseDataAccessor so it should not cause too much out-of-the-box + // network traffic. but there's chance to improve this: + // TODO: use TableDataManager: it is already getting tableConfig and Schema when processing segments. if (TableType.OFFLINE.name().equals(tableType)) { - requests.add(constructServerQueryRequest(distributedStagePlan, requestMetadataMap, + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore, + TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName)); + Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore, + TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName)); + requests.add(constructServerQueryRequest(distributedStagePlan, requestMetadataMap, tableConfig, schema, stageMetadata.getTimeBoundaryInfo(), TableType.OFFLINE, tableEntry.getValue())); } else if (TableType.REALTIME.name().equals(tableType)) { - requests.add(constructServerQueryRequest(distributedStagePlan, requestMetadataMap, + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore, + TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName)); + Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore, + TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName)); + requests.add(constructServerQueryRequest(distributedStagePlan, requestMetadataMap, tableConfig, schema, stageMetadata.getTimeBoundaryInfo(), TableType.REALTIME, tableEntry.getValue())); } else { throw new IllegalArgumentException("Unsupported table type key: " + tableType); @@ -93,14 +115,15 @@ public static List constructServerQueryRequest(DistributedSt } public static ServerQueryRequest constructServerQueryRequest(DistributedStagePlan distributedStagePlan, - Map requestMetadataMap, TimeBoundaryInfo timeBoundaryInfo, TableType tableType, - List segmentList) { + Map requestMetadataMap, TableConfig tableConfig, Schema schema, + TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List segmentList) { InstanceRequest instanceRequest = new InstanceRequest(); instanceRequest.setRequestId(Long.parseLong(requestMetadataMap.get("REQUEST_ID"))); instanceRequest.setBrokerId("unknown"); instanceRequest.setEnableTrace(false); instanceRequest.setSearchSegments(segmentList); - instanceRequest.setQuery(constructBrokerRequest(distributedStagePlan, tableType, timeBoundaryInfo)); + instanceRequest.setQuery(constructBrokerRequest(distributedStagePlan, tableType, tableConfig, schema, + timeBoundaryInfo)); return new ServerQueryRequest(instanceRequest, new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis()); } @@ -108,8 +131,8 @@ public static ServerQueryRequest constructServerQueryRequest(DistributedStagePla // TODO: this is a hack, create a broker request object should not be needed because we rewrite the entire // query into stages already. public static BrokerRequest constructBrokerRequest(DistributedStagePlan distributedStagePlan, TableType tableType, - TimeBoundaryInfo timeBoundaryInfo) { - PinotQuery pinotQuery = constructPinotQuery(distributedStagePlan, tableType, timeBoundaryInfo); + TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo) { + PinotQuery pinotQuery = constructPinotQuery(distributedStagePlan, tableType, tableConfig, schema, timeBoundaryInfo); BrokerRequest brokerRequest = new BrokerRequest(); brokerRequest.setPinotQuery(pinotQuery); // Set table name in broker request because it is used for access control, query routing etc. @@ -123,7 +146,7 @@ public static BrokerRequest constructBrokerRequest(DistributedStagePlan distribu } public static PinotQuery constructPinotQuery(DistributedStagePlan distributedStagePlan, TableType tableType, - TimeBoundaryInfo timeBoundaryInfo) { + TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo) { PinotQuery pinotQuery = new PinotQuery(); pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT); pinotQuery.setExplain(false); @@ -134,6 +157,7 @@ public static PinotQuery constructPinotQuery(DistributedStagePlan distributedSta for (QueryRewriter queryRewriter : QUERY_REWRITERS) { pinotQuery = queryRewriter.rewrite(pinotQuery); } + QUERY_OPTIMIZER.optimize(pinotQuery, tableConfig, schema); return pinotQuery; } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java index 5ac927d733b6..d9ae5150bca3 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java @@ -28,8 +28,12 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; +import org.apache.helix.HelixManager; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.NamedThreadFactory; +import org.apache.pinot.common.utils.SchemaUtils; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; import org.apache.pinot.query.runtime.QueryRunner; @@ -50,8 +54,11 @@ import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.ReadMode; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.matches; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -75,6 +82,8 @@ public class QueryServerEnclosure { private static final int DEFAULT_EXECUTOR_THREAD_NUM = 5; private static final String[] STRING_FIELD_LIST = new String[]{"foo", "bar", "alice", "bob", "charlie"}; private static final int[] INT_FIELD_LIST = new int[]{1, 42}; + private static final String TABLE_CONFIGS_PREFIX = "/CONFIGS/TABLE/"; + private static final String SCHEMAS_PREFIX = "/SCHEMAS/"; private final ExecutorService _testExecutor; private final int _queryRunnerPort; @@ -84,6 +93,7 @@ public class QueryServerEnclosure { private final InstanceDataManager _instanceDataManager; private final Map _tableDataManagers = new HashMap<>(); private final Map _indexDirs; + private final HelixManager _helixManager; private QueryRunner _queryRunner; @@ -101,6 +111,7 @@ public QueryServerEnclosure(Map indexDirs, Map indexDirs, Map zkHelixPropertyStore = mock(ZkHelixPropertyStore.class); + when(zkHelixPropertyStore.get(anyString(), any(), anyInt())).thenAnswer(invocationOnMock -> { + String path = invocationOnMock.getArgument(0); + if (path.startsWith(TABLE_CONFIGS_PREFIX)) { + // TODO: add table config mock. + return null; + } else if (path.startsWith(SCHEMAS_PREFIX)) { + String tableName = TableNameBuilder.extractRawTableName(path.substring(SCHEMAS_PREFIX.length())); + return SchemaUtils.toZNRecord(QueryEnvironmentTestUtils.SCHEMA_NAME_MAP.get(tableName)); + } else { + return null; + } + }); + HelixManager helixManager = mock(HelixManager.class); + when(helixManager.getHelixPropertyStore()).thenReturn(zkHelixPropertyStore); + return helixManager; + } + private ServerMetrics mockServiceMetrics() { return mock(ServerMetrics.class); } @@ -186,7 +216,7 @@ public void start() throws Exception { PinotConfiguration configuration = new PinotConfiguration(_runnerConfig); _queryRunner = new QueryRunner(); - _queryRunner.init(configuration, _instanceDataManager, mockServiceMetrics()); + _queryRunner.init(configuration, _instanceDataManager, _helixManager, mockServiceMetrics()); _queryRunner.start(); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java index 43ba64815d7a..5bfd3511bd46 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java @@ -146,7 +146,7 @@ private Object[][] provideTestSql() { new Object[]{"SELECT * FROM a ORDER BY col1 LIMIT 20"}, // No match filter - new Object[]{"SELECT * FROM b WHERE col3 < 0"}, + new Object[]{"SELECT * FROM b WHERE col3 < 0.5"}, // Hybrid table new Object[]{"SELECT * FROM d"}, diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java index 368bffb762cc..037cb5bf3b23 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java @@ -116,7 +116,8 @@ public ServerInstance(ServerConf serverConf, HelixManager helixManager, AccessCo if (serverConf.isMultiStageServerEnabled()) { LOGGER.info("Initializing Multi-stage query engine"); - _workerQueryServer = new WorkerQueryServer(serverConf.getPinotConfig(), _instanceDataManager, _serverMetrics); + _workerQueryServer = new WorkerQueryServer(serverConf.getPinotConfig(), _instanceDataManager, helixManager, + _serverMetrics); } else { _workerQueryServer = null; } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java index 711c763c38d7..b38b89093c55 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java @@ -20,6 +20,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.helix.HelixManager; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.NamedThreadFactory; import org.apache.pinot.core.data.manager.InstanceDataManager; @@ -37,6 +38,7 @@ public class WorkerQueryServer { private final ExecutorService _executor; private final int _queryServicePort; private final PinotConfiguration _configuration; + private final HelixManager _helixManager; private QueryServer _queryWorkerService; private QueryRunner _queryRunner; @@ -44,14 +46,15 @@ public class WorkerQueryServer { private ServerMetrics _serverMetrics; public WorkerQueryServer(PinotConfiguration configuration, InstanceDataManager instanceDataManager, - ServerMetrics serverMetrics) { + HelixManager helixManager, ServerMetrics serverMetrics) { _configuration = toWorkerQueryConfig(configuration); + _helixManager = helixManager; _instanceDataManager = instanceDataManager; _serverMetrics = serverMetrics; _queryServicePort = _configuration.getProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, QueryConfig.DEFAULT_QUERY_SERVER_PORT); _queryRunner = new QueryRunner(); - _queryRunner.init(_configuration, _instanceDataManager, _serverMetrics); + _queryRunner.init(_configuration, _instanceDataManager, _helixManager, _serverMetrics); _queryWorkerService = new QueryServer(_queryServicePort, _queryRunner); _executor = Executors.newFixedThreadPool(DEFAULT_EXECUTOR_THREAD_NUM, new NamedThreadFactory("worker_query_server_enclosure_on_" + _queryServicePort + "_port"));