diff --git a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/BaseBinaryGeoTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/BaseBinaryGeoTransformFunction.java index c528cedb51d9..a5720ff7ae76 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/BaseBinaryGeoTransformFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/BaseBinaryGeoTransformFunction.java @@ -22,13 +22,13 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.core.operator.ColumnContext; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.transform.function.BaseTransformFunction; import org.apache.pinot.core.operator.transform.function.LiteralTransformFunction; import org.apache.pinot.core.operator.transform.function.TransformFunction; import org.apache.pinot.core.plan.DocIdSetPlanNode; import org.apache.pinot.segment.local.utils.GeometrySerializer; -import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.utils.BytesUtils; import org.locationtech.jts.geom.Geometry; @@ -46,17 +46,15 @@ public abstract class BaseBinaryGeoTransformFunction extends BaseTransformFuncti private double[] _doubleResults; @Override - public void init(List arguments, Map dataSourceMap) { - Preconditions - .checkArgument(arguments.size() == 2, "2 arguments are required for transform function: %s", getName()); + public void init(List arguments, Map columnContextMap) { + Preconditions.checkArgument(arguments.size() == 2, "2 arguments are required for transform function: %s", + getName()); TransformFunction transformFunction = arguments.get(0); Preconditions.checkArgument(transformFunction.getResultMetadata().isSingleValue(), "First argument must be single-valued for transform function: %s", getName()); Preconditions.checkArgument(transformFunction.getResultMetadata().getDataType() == FieldSpec.DataType.BYTES || transformFunction instanceof LiteralTransformFunction, - "The first argument must be of type BYTES , but was %s", - transformFunction.getResultMetadata().getDataType() - ); + "The first argument must be of type BYTES , but was %s", transformFunction.getResultMetadata().getDataType()); if (transformFunction instanceof LiteralTransformFunction) { _firstLiteral = GeometrySerializer.deserialize( BytesUtils.toBytes(((LiteralTransformFunction) transformFunction).getLiteral())); @@ -68,9 +66,7 @@ public void init(List arguments, Map data "Second argument must be single-valued for transform function: %s", getName()); Preconditions.checkArgument(transformFunction.getResultMetadata().getDataType() == FieldSpec.DataType.BYTES || transformFunction instanceof LiteralTransformFunction, - "The second argument must be of type BYTES , but was %s", - transformFunction.getResultMetadata().getDataType() - ); + "The second argument must be of type BYTES , but was %s", transformFunction.getResultMetadata().getDataType()); if (transformFunction instanceof LiteralTransformFunction) { _secondLiteral = GeometrySerializer.deserialize( BytesUtils.toBytes(((LiteralTransformFunction) transformFunction).getLiteral())); @@ -79,29 +75,29 @@ public void init(List arguments, Map data } } - protected int[] transformGeometryToIntValuesSV(ProjectionBlock projectionBlock) { + protected int[] transformGeometryToIntValuesSV(ValueBlock valueBlock) { if (_intResults == null) { _intResults = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL]; } byte[][] firstValues; byte[][] secondValues; if (_firstArgument == null && _secondArgument == null) { - _intResults = new int[Math.min(projectionBlock.getNumDocs(), DocIdSetPlanNode.MAX_DOC_PER_CALL)]; + _intResults = new int[Math.min(valueBlock.getNumDocs(), DocIdSetPlanNode.MAX_DOC_PER_CALL)]; Arrays.fill(_intResults, transformGeometryToInt(_firstLiteral, _secondLiteral)); } else if (_firstArgument == null) { - secondValues = _secondArgument.transformToBytesValuesSV(projectionBlock); - for (int i = 0; i < projectionBlock.getNumDocs(); i++) { + secondValues = _secondArgument.transformToBytesValuesSV(valueBlock); + for (int i = 0; i < valueBlock.getNumDocs(); i++) { _intResults[i] = transformGeometryToInt(_firstLiteral, GeometrySerializer.deserialize(secondValues[i])); } } else if (_secondArgument == null) { - firstValues = _firstArgument.transformToBytesValuesSV(projectionBlock); - for (int i = 0; i < projectionBlock.getNumDocs(); i++) { + firstValues = _firstArgument.transformToBytesValuesSV(valueBlock); + for (int i = 0; i < valueBlock.getNumDocs(); i++) { _intResults[i] = transformGeometryToInt(GeometrySerializer.deserialize(firstValues[i]), _secondLiteral); } } else { - firstValues = _firstArgument.transformToBytesValuesSV(projectionBlock); - secondValues = _secondArgument.transformToBytesValuesSV(projectionBlock); - for (int i = 0; i < projectionBlock.getNumDocs(); i++) { + firstValues = _firstArgument.transformToBytesValuesSV(valueBlock); + secondValues = _secondArgument.transformToBytesValuesSV(valueBlock); + for (int i = 0; i < valueBlock.getNumDocs(); i++) { _intResults[i] = transformGeometryToInt(GeometrySerializer.deserialize(firstValues[i]), GeometrySerializer.deserialize(secondValues[i])); } @@ -109,29 +105,29 @@ protected int[] transformGeometryToIntValuesSV(ProjectionBlock projectionBlock) return _intResults; } - protected double[] transformGeometryToDoubleValuesSV(ProjectionBlock projectionBlock) { + protected double[] transformGeometryToDoubleValuesSV(ValueBlock valueBlock) { if (_doubleResults == null) { _doubleResults = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL]; } byte[][] firstValues; byte[][] secondValues; if (_firstArgument == null && _secondArgument == null) { - _doubleResults = new double[Math.min(projectionBlock.getNumDocs(), DocIdSetPlanNode.MAX_DOC_PER_CALL)]; + _doubleResults = new double[Math.min(valueBlock.getNumDocs(), DocIdSetPlanNode.MAX_DOC_PER_CALL)]; Arrays.fill(_doubleResults, transformGeometryToDouble(_firstLiteral, _secondLiteral)); } else if (_firstArgument == null) { - secondValues = _secondArgument.transformToBytesValuesSV(projectionBlock); - for (int i = 0; i < projectionBlock.getNumDocs(); i++) { + secondValues = _secondArgument.transformToBytesValuesSV(valueBlock); + for (int i = 0; i < valueBlock.getNumDocs(); i++) { _doubleResults[i] = transformGeometryToDouble(_firstLiteral, GeometrySerializer.deserialize(secondValues[i])); } } else if (_secondArgument == null) { - firstValues = _firstArgument.transformToBytesValuesSV(projectionBlock); - for (int i = 0; i < projectionBlock.getNumDocs(); i++) { + firstValues = _firstArgument.transformToBytesValuesSV(valueBlock); + for (int i = 0; i < valueBlock.getNumDocs(); i++) { _doubleResults[i] = transformGeometryToDouble(GeometrySerializer.deserialize(firstValues[i]), _secondLiteral); } } else { - firstValues = _firstArgument.transformToBytesValuesSV(projectionBlock); - secondValues = _secondArgument.transformToBytesValuesSV(projectionBlock); - for (int i = 0; i < projectionBlock.getNumDocs(); i++) { + firstValues = _firstArgument.transformToBytesValuesSV(valueBlock); + secondValues = _secondArgument.transformToBytesValuesSV(valueBlock); + for (int i = 0; i < valueBlock.getNumDocs(); i++) { _doubleResults[i] = transformGeometryToDouble(GeometrySerializer.deserialize(firstValues[i]), GeometrySerializer.deserialize(secondValues[i])); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/ConstructFromTextFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/ConstructFromTextFunction.java index d0ff874aee3c..189aed33cf30 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/ConstructFromTextFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/ConstructFromTextFunction.java @@ -22,13 +22,13 @@ import java.util.List; import java.util.Map; import org.apache.pinot.common.Utils; -import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.core.operator.ColumnContext; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.transform.TransformResultMetadata; import org.apache.pinot.core.operator.transform.function.BaseTransformFunction; import org.apache.pinot.core.operator.transform.function.TransformFunction; import org.apache.pinot.core.plan.DocIdSetPlanNode; import org.apache.pinot.segment.local.utils.GeometrySerializer; -import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.spi.data.FieldSpec; import org.locationtech.jts.geom.Geometry; import org.locationtech.jts.io.ParseException; @@ -44,7 +44,7 @@ abstract class ConstructFromTextFunction extends BaseTransformFunction { protected WKTReader _reader; @Override - public void init(List arguments, Map dataSourceMap) { + public void init(List arguments, Map columnContextMap) { Preconditions.checkArgument(arguments.size() == 1, "Exactly 1 argument is required for transform function: %s", getName()); TransformFunction transformFunction = arguments.get(0); @@ -64,12 +64,12 @@ public TransformResultMetadata getResultMetadata() { } @Override - public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) { + public byte[][] transformToBytesValuesSV(ValueBlock valueBlock) { if (_results == null) { _results = new byte[DocIdSetPlanNode.MAX_DOC_PER_CALL][]; } - String[] argumentValues = _transformFunction.transformToStringValuesSV(projectionBlock); - int length = projectionBlock.getNumDocs(); + String[] argumentValues = _transformFunction.transformToStringValuesSV(valueBlock); + int length = valueBlock.getNumDocs(); for (int i = 0; i < length; i++) { try { Geometry geometry = _reader.read(argumentValues[i]); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/ConstructFromWKBFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/ConstructFromWKBFunction.java index 761fe06a9272..29ac9f8244ea 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/ConstructFromWKBFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/ConstructFromWKBFunction.java @@ -21,13 +21,13 @@ import com.google.common.base.Preconditions; import java.util.List; import java.util.Map; -import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.core.operator.ColumnContext; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.transform.TransformResultMetadata; import org.apache.pinot.core.operator.transform.function.BaseTransformFunction; import org.apache.pinot.core.operator.transform.function.TransformFunction; import org.apache.pinot.core.plan.DocIdSetPlanNode; import org.apache.pinot.segment.local.utils.GeometrySerializer; -import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.utils.BytesUtils; import org.locationtech.jts.geom.Geometry; @@ -44,7 +44,7 @@ abstract class ConstructFromWKBFunction extends BaseTransformFunction { private WKBReader _reader; @Override - public void init(List arguments, Map dataSourceMap) { + public void init(List arguments, Map columnContextMap) { Preconditions.checkArgument(arguments.size() == 1, "Exactly 1 argument is required for transform function: %s", getName()); TransformFunction transformFunction = arguments.get(0); @@ -64,12 +64,12 @@ public TransformResultMetadata getResultMetadata() { } @Override - public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) { + public byte[][] transformToBytesValuesSV(ValueBlock valueBlock) { if (_results == null) { _results = new byte[DocIdSetPlanNode.MAX_DOC_PER_CALL][]; } - byte[][] argumentValues = _transformFunction.transformToBytesValuesSV(projectionBlock); - for (int i = 0; i < projectionBlock.getNumDocs(); i++) { + byte[][] argumentValues = _transformFunction.transformToBytesValuesSV(valueBlock); + for (int i = 0; i < valueBlock.getNumDocs(); i++) { try { Geometry geometry = _reader.read(argumentValues[i]); _results[i] = GeometrySerializer.serialize(geometry); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/GeoToH3Function.java b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/GeoToH3Function.java index 75f46c7b2cce..c716b6e677b0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/GeoToH3Function.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/GeoToH3Function.java @@ -21,14 +21,14 @@ import com.google.common.base.Preconditions; import java.util.List; import java.util.Map; -import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.core.operator.ColumnContext; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.transform.TransformResultMetadata; import org.apache.pinot.core.operator.transform.function.BaseTransformFunction; import org.apache.pinot.core.operator.transform.function.LiteralTransformFunction; import org.apache.pinot.core.operator.transform.function.TransformFunction; import org.apache.pinot.core.plan.DocIdSetPlanNode; import org.apache.pinot.segment.local.utils.GeometrySerializer; -import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.spi.data.FieldSpec; import org.locationtech.jts.geom.Geometry; @@ -51,7 +51,7 @@ public String getName() { } @Override - public void init(List arguments, Map dataSourceMap) { + public void init(List arguments, Map columnContextMap) { Preconditions.checkArgument(arguments.size() == 3 || arguments.size() == 2, "Transform function %s requires 2 or 3 arguments", getName()); if (arguments.size() == 3) { @@ -95,23 +95,23 @@ public TransformResultMetadata getResultMetadata() { } @Override - public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) { + public long[] transformToLongValuesSV(ValueBlock valueBlock) { if (_results == null) { _results = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL]; } if (_thirdArgument == null) { - byte[][] geoValues = _firstArgument.transformToBytesValuesSV(projectionBlock); - int[] resValues = _secondArgument.transformToIntValuesSV(projectionBlock); - for (int i = 0; i < projectionBlock.getNumDocs(); i++) { + byte[][] geoValues = _firstArgument.transformToBytesValuesSV(valueBlock); + int[] resValues = _secondArgument.transformToIntValuesSV(valueBlock); + for (int i = 0; i < valueBlock.getNumDocs(); i++) { Geometry geometry = GeometrySerializer.deserialize(geoValues[i]); _results[i] = ScalarFunctions.geoToH3(geometry.getCoordinate().x, geometry.getCoordinate().y, resValues[i]); } } else { - double[] lonValues = _firstArgument.transformToDoubleValuesSV(projectionBlock); - double[] latValues = _secondArgument.transformToDoubleValuesSV(projectionBlock); - int[] resValues = _thirdArgument.transformToIntValuesSV(projectionBlock); - for (int i = 0; i < projectionBlock.getNumDocs(); i++) { + double[] lonValues = _firstArgument.transformToDoubleValuesSV(valueBlock); + double[] latValues = _secondArgument.transformToDoubleValuesSV(valueBlock); + int[] resValues = _thirdArgument.transformToIntValuesSV(valueBlock); + for (int i = 0; i < valueBlock.getNumDocs(); i++) { _results[i] = ScalarFunctions.geoToH3(lonValues[i], latValues[i], resValues[i]); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StAreaFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StAreaFunction.java index 5ac874335c80..5102582024d6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StAreaFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StAreaFunction.java @@ -21,7 +21,8 @@ import com.google.common.base.Preconditions; import java.util.List; import java.util.Map; -import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.core.operator.ColumnContext; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.transform.TransformResultMetadata; import org.apache.pinot.core.operator.transform.function.BaseTransformFunction; import org.apache.pinot.core.operator.transform.function.LiteralTransformFunction; @@ -29,7 +30,6 @@ import org.apache.pinot.core.plan.DocIdSetPlanNode; import org.apache.pinot.segment.local.utils.GeometrySerializer; import org.apache.pinot.segment.local.utils.GeometryUtils; -import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.spi.data.FieldSpec; import org.locationtech.jts.geom.Geometry; import org.locationtech.jts.geom.LineString; @@ -57,7 +57,7 @@ public String getName() { } @Override - public void init(List arguments, Map dataSourceMap) { + public void init(List arguments, Map columnContextMap) { Preconditions .checkArgument(arguments.size() == 1, "Exactly 1 argument is required for transform function: %s", getName()); TransformFunction transformFunction = arguments.get(0); @@ -76,13 +76,13 @@ public TransformResultMetadata getResultMetadata() { } @Override - public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) { + public double[] transformToDoubleValuesSV(ValueBlock valueBlock) { if (_results == null) { _results = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL]; } - byte[][] values = _transformFunction.transformToBytesValuesSV(projectionBlock); - int numDocs = projectionBlock.getNumDocs(); + byte[][] values = _transformFunction.transformToBytesValuesSV(valueBlock); + int numDocs = valueBlock.getNumDocs(); for (int i = 0; i < numDocs; i++) { Geometry geometry = GeometrySerializer.deserialize(values[i]); _results[i] = GeometryUtils.isGeography(geometry) ? calculateGeographyArea(geometry) : geometry.getArea(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StAsBinaryFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StAsBinaryFunction.java index 66e399925d83..4c9da0c2aa20 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StAsBinaryFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StAsBinaryFunction.java @@ -21,14 +21,14 @@ import com.google.common.base.Preconditions; import java.util.List; import java.util.Map; -import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.core.operator.ColumnContext; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.transform.TransformResultMetadata; import org.apache.pinot.core.operator.transform.function.BaseTransformFunction; import org.apache.pinot.core.operator.transform.function.TransformFunction; import org.apache.pinot.core.plan.DocIdSetPlanNode; import org.apache.pinot.segment.local.utils.GeometrySerializer; import org.apache.pinot.segment.local.utils.GeometryUtils; -import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.spi.data.FieldSpec; import org.locationtech.jts.geom.Geometry; @@ -48,7 +48,7 @@ public String getName() { } @Override - public void init(List arguments, Map dataSourceMap) { + public void init(List arguments, Map columnContextMap) { Preconditions.checkArgument(arguments.size() == 1, "Exactly 1 argument is required for transform function: %s", getName()); TransformFunction transformFunction = arguments.get(0); @@ -65,13 +65,13 @@ public TransformResultMetadata getResultMetadata() { } @Override - public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) { + public byte[][] transformToBytesValuesSV(ValueBlock valueBlock) { if (_results == null) { _results = new byte[DocIdSetPlanNode.MAX_DOC_PER_CALL][]; } - byte[][] values = _transformFunction.transformToBytesValuesSV(projectionBlock); + byte[][] values = _transformFunction.transformToBytesValuesSV(valueBlock); Geometry geometry; - for (int i = 0; i < projectionBlock.getNumDocs(); i++) { + for (int i = 0; i < valueBlock.getNumDocs(); i++) { geometry = GeometrySerializer.deserialize(values[i]); _results[i] = GeometryUtils.WKB_WRITER.write(geometry); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StAsTextFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StAsTextFunction.java index 4f277fb3232b..75e8f9acd4cd 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StAsTextFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StAsTextFunction.java @@ -21,14 +21,14 @@ import com.google.common.base.Preconditions; import java.util.List; import java.util.Map; -import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.core.operator.ColumnContext; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.transform.TransformResultMetadata; import org.apache.pinot.core.operator.transform.function.BaseTransformFunction; import org.apache.pinot.core.operator.transform.function.TransformFunction; import org.apache.pinot.core.plan.DocIdSetPlanNode; import org.apache.pinot.segment.local.utils.GeometrySerializer; import org.apache.pinot.segment.local.utils.GeometryUtils; -import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.spi.data.FieldSpec; import org.locationtech.jts.geom.Geometry; @@ -48,7 +48,7 @@ public String getName() { } @Override - public void init(List arguments, Map dataSourceMap) { + public void init(List arguments, Map columnContextMap) { Preconditions.checkArgument(arguments.size() == 1, "Exactly 1 argument is required for transform function: %s", getName()); TransformFunction transformFunction = arguments.get(0); @@ -65,13 +65,13 @@ public TransformResultMetadata getResultMetadata() { } @Override - public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) { + public String[] transformToStringValuesSV(ValueBlock valueBlock) { if (_results == null) { _results = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL]; } - byte[][] values = _transformFunction.transformToBytesValuesSV(projectionBlock); + byte[][] values = _transformFunction.transformToBytesValuesSV(valueBlock); Geometry geometry; - for (int i = 0; i < projectionBlock.getNumDocs(); i++) { + for (int i = 0; i < valueBlock.getNumDocs(); i++) { geometry = GeometrySerializer.deserialize(values[i]); _results[i] = GeometryUtils.WKT_WRITER.write(geometry); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StContainsFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StContainsFunction.java index 210215c7e6d8..1045da58bb91 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StContainsFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StContainsFunction.java @@ -18,7 +18,7 @@ */ package org.apache.pinot.core.geospatial.transform.function; -import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.transform.TransformResultMetadata; import org.apache.pinot.segment.local.utils.GeometryUtils; import org.locationtech.jts.geom.Geometry; @@ -43,8 +43,8 @@ public TransformResultMetadata getResultMetadata() { } @Override - public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) { - return transformGeometryToIntValuesSV(projectionBlock); + public int[] transformToIntValuesSV(ValueBlock valueBlock) { + return transformGeometryToIntValuesSV(valueBlock); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StDistanceFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StDistanceFunction.java index a28b0ad40a91..1802d7698aca 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StDistanceFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StDistanceFunction.java @@ -19,7 +19,7 @@ package org.apache.pinot.core.geospatial.transform.function; import com.google.common.base.Preconditions; -import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.transform.TransformResultMetadata; import org.apache.pinot.segment.local.utils.GeometryUtils; import org.locationtech.jts.geom.Geometry; @@ -49,8 +49,8 @@ public TransformResultMetadata getResultMetadata() { } @Override - public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) { - return transformGeometryToDoubleValuesSV(projectionBlock); + public double[] transformToDoubleValuesSV(ValueBlock valueBlock) { + return transformGeometryToDoubleValuesSV(valueBlock); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StEqualsFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StEqualsFunction.java index 45b4f269e1a6..73d5e06c2a00 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StEqualsFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StEqualsFunction.java @@ -18,7 +18,7 @@ */ package org.apache.pinot.core.geospatial.transform.function; -import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.transform.TransformResultMetadata; import org.locationtech.jts.geom.Geometry; @@ -40,8 +40,8 @@ public TransformResultMetadata getResultMetadata() { } @Override - public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) { - return transformGeometryToIntValuesSV(projectionBlock); + public int[] transformToIntValuesSV(ValueBlock valueBlock) { + return transformGeometryToIntValuesSV(valueBlock); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StGeometryTypeFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StGeometryTypeFunction.java index 27b1a203a1ae..4fa9bf814f9f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StGeometryTypeFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StGeometryTypeFunction.java @@ -21,13 +21,13 @@ import com.google.common.base.Preconditions; import java.util.List; import java.util.Map; -import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.core.operator.ColumnContext; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.transform.TransformResultMetadata; import org.apache.pinot.core.operator.transform.function.BaseTransformFunction; import org.apache.pinot.core.operator.transform.function.TransformFunction; import org.apache.pinot.core.plan.DocIdSetPlanNode; import org.apache.pinot.segment.local.utils.GeometrySerializer; -import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.spi.data.FieldSpec; import org.locationtech.jts.geom.Geometry; @@ -46,7 +46,7 @@ public String getName() { } @Override - public void init(List arguments, Map dataSourceMap) { + public void init(List arguments, Map columnContextMap) { Preconditions .checkArgument(arguments.size() == 1, "Exactly 1 argument is required for transform function: %s", getName()); TransformFunction transformFunction = arguments.get(0); @@ -63,13 +63,13 @@ public TransformResultMetadata getResultMetadata() { } @Override - public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) { + public String[] transformToStringValuesSV(ValueBlock valueBlock) { if (_results == null) { _results = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL]; } - byte[][] values = _transformFunction.transformToBytesValuesSV(projectionBlock); + byte[][] values = _transformFunction.transformToBytesValuesSV(valueBlock); Geometry geometry; - for (int i = 0; i < projectionBlock.getNumDocs(); i++) { + for (int i = 0; i < valueBlock.getNumDocs(); i++) { geometry = GeometrySerializer.deserialize(values[i]); _results[i] = geometry.getGeometryType(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StPointFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StPointFunction.java index a780c4750179..13b1e28a909e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StPointFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StPointFunction.java @@ -21,13 +21,13 @@ import com.google.common.base.Preconditions; import java.util.List; import java.util.Map; -import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.core.operator.ColumnContext; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.transform.TransformResultMetadata; import org.apache.pinot.core.operator.transform.function.BaseTransformFunction; import org.apache.pinot.core.operator.transform.function.LiteralTransformFunction; import org.apache.pinot.core.operator.transform.function.TransformFunction; import org.apache.pinot.core.plan.DocIdSetPlanNode; -import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.spi.utils.BooleanUtils; @@ -47,7 +47,7 @@ public String getName() { } @Override - public void init(List arguments, Map dataSourceMap) { + public void init(List arguments, Map columnContextMap) { Preconditions.checkArgument(arguments.size() == 2 || arguments.size() == 3, "2 or 3 arguments are required for transform function: %s", getName()); TransformFunction transformFunction = arguments.get(0); @@ -72,13 +72,13 @@ public TransformResultMetadata getResultMetadata() { } @Override - public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) { + public byte[][] transformToBytesValuesSV(ValueBlock valueBlock) { if (_results == null) { _results = new byte[DocIdSetPlanNode.MAX_DOC_PER_CALL][]; } - double[] firstValues = _firstArgument.transformToDoubleValuesSV(projectionBlock); - double[] secondValues = _secondArgument.transformToDoubleValuesSV(projectionBlock); - for (int i = 0; i < projectionBlock.getNumDocs(); i++) { + double[] firstValues = _firstArgument.transformToDoubleValuesSV(valueBlock); + double[] secondValues = _secondArgument.transformToDoubleValuesSV(valueBlock); + for (int i = 0; i < valueBlock.getNumDocs(); i++) { _results[i] = ScalarFunctions.stPoint(firstValues[i], secondValues[i], _isGeography); } return _results; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StPolygonFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StPolygonFunction.java index 6f477662d2f1..7680eafa535e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StPolygonFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StPolygonFunction.java @@ -19,7 +19,7 @@ package org.apache.pinot.core.geospatial.transform.function; import com.google.common.base.Preconditions; -import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.plan.DocIdSetPlanNode; import org.apache.pinot.segment.local.utils.GeometrySerializer; import org.apache.pinot.segment.local.utils.GeometryUtils; @@ -46,12 +46,12 @@ public String getName() { } @Override - public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) { + public byte[][] transformToBytesValuesSV(ValueBlock valueBlock) { if (_results == null) { _results = new byte[DocIdSetPlanNode.MAX_DOC_PER_CALL][]; } - String[] argumentValues = _transformFunction.transformToStringValuesSV(projectionBlock); - int length = projectionBlock.getNumDocs(); + String[] argumentValues = _transformFunction.transformToStringValuesSV(valueBlock); + int length = valueBlock.getNumDocs(); for (int i = 0; i < length; i++) { try { Geometry geometry = _reader.read(argumentValues[i]); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StWithinFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StWithinFunction.java index 474af0d22805..60b76ac65ad6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StWithinFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/StWithinFunction.java @@ -18,7 +18,7 @@ */ package org.apache.pinot.core.geospatial.transform.function; -import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.transform.TransformResultMetadata; import org.apache.pinot.segment.local.utils.GeometryUtils; import org.locationtech.jts.geom.Geometry; @@ -42,8 +42,8 @@ public TransformResultMetadata getResultMetadata() { } @Override - public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) { - return transformGeometryToIntValuesSV(projectionBlock); + public int[] transformToIntValuesSV(ValueBlock valueBlock) { + return transformGeometryToIntValuesSV(valueBlock); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseProjectOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseProjectOperator.java new file mode 100644 index 000000000000..aaaa8cc41555 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseProjectOperator.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator; + +import java.util.Map; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.core.operator.blocks.ValueBlock; + + +public abstract class BaseProjectOperator extends BaseOperator { + + /** + * Returns a map from source column name to context. + */ + public abstract Map getSourceColumnContextMap(); + + /** + * Returns the result column context. Without transform, the source and result column context are the same. + */ + public abstract ColumnContext getResultColumnContext(ExpressionContext expression); + + /** + * Returns the number of columns projected. + */ + public int getNumColumnsProjected() { + return getSourceColumnContextMap().size(); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/ColumnContext.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/ColumnContext.java new file mode 100644 index 000000000000..878ef0b3f952 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/ColumnContext.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator; + +import javax.annotation.Nullable; +import org.apache.pinot.core.operator.transform.TransformResultMetadata; +import org.apache.pinot.core.operator.transform.function.TransformFunction; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.datasource.DataSourceMetadata; +import org.apache.pinot.segment.spi.index.reader.Dictionary; +import org.apache.pinot.spi.data.FieldSpec.DataType; + + +public class ColumnContext { + private final DataType _dataType; + private final boolean _isSingleValue; + private final Dictionary _dictionary; + private final DataSource _dataSource; + + private ColumnContext(DataType dataType, boolean isSingleValue, @Nullable Dictionary dictionary, + @Nullable DataSource dataSource) { + _dataType = dataType; + _isSingleValue = isSingleValue; + _dictionary = dictionary; + _dataSource = dataSource; + } + + public DataType getDataType() { + return _dataType; + } + + public boolean isSingleValue() { + return _isSingleValue; + } + + @Nullable + public Dictionary getDictionary() { + return _dictionary; + } + + @Nullable + public DataSource getDataSource() { + return _dataSource; + } + + public static ColumnContext fromDataSource(DataSource dataSource) { + DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata(); + return new ColumnContext(dataSourceMetadata.getDataType(), dataSourceMetadata.isSingleValue(), + dataSource.getDictionary(), dataSource); + } + + public static ColumnContext fromTransformFunction(TransformFunction transformFunction) { + TransformResultMetadata resultMetadata = transformFunction.getResultMetadata(); + return new ColumnContext(resultMetadata.getDataType(), resultMetadata.isSingleValue(), + transformFunction.getDictionary(), null); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java index e0062b00d540..3a48d6ed929f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java @@ -19,9 +19,12 @@ package org.apache.pinot.core.operator; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.core.common.DataBlockCache; import org.apache.pinot.core.common.DataFetcher; import org.apache.pinot.core.common.Operator; @@ -31,28 +34,33 @@ import org.apache.pinot.spi.trace.Tracing; -public class ProjectionOperator extends BaseOperator { - +public class ProjectionOperator extends BaseProjectOperator { private static final String EXPLAIN_NAME = "PROJECT"; private final Map _dataSourceMap; private final BaseOperator _docIdSetOperator; private final DataBlockCache _dataBlockCache; + private final Map _columnContextMap; public ProjectionOperator(Map dataSourceMap, @Nullable BaseOperator docIdSetOperator) { _dataSourceMap = dataSourceMap; _docIdSetOperator = docIdSetOperator; _dataBlockCache = new DataBlockCache(new DataFetcher(dataSourceMap)); + _columnContextMap = new HashMap<>(HashUtil.getHashMapCapacity(dataSourceMap.size())); + dataSourceMap.forEach( + (column, dataSource) -> _columnContextMap.put(column, ColumnContext.fromDataSource(dataSource))); } - /** - * Returns the map from column to data source. - * - * @return Map from column to data source - */ - public Map getDataSourceMap() { - return _dataSourceMap; + @Override + public Map getSourceColumnContextMap() { + return _columnContextMap; + } + + @Override + public ColumnContext getResultColumnContext(ExpressionContext expression) { + assert expression.getType() == ExpressionContext.Type.IDENTIFIER; + return _columnContextMap.get(expression.getIdentifier()); } @Override @@ -70,7 +78,7 @@ protected ProjectionBlock getNextBlock() { } @Override - public List getChildOperators() { + public List> getChildOperators() { return Collections.singletonList(_docIdSetOperator); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java index ee1983d8e436..efef8693d030 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java @@ -20,7 +20,7 @@ import java.math.BigDecimal; import java.util.Map; -import org.apache.pinot.core.common.Block; +import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.common.DataBlockCache; import org.apache.pinot.core.operator.docvalsets.ProjectionBlockValSet; @@ -32,7 +32,7 @@ * ProjectionBlock holds a column name to Block Map. * It provides DocIdSetBlock for a given column. */ -public class ProjectionBlock implements Block { +public class ProjectionBlock implements ValueBlock { private final Map _dataSourceMap; private final DataBlockCache _dataBlockCache; @@ -41,14 +41,23 @@ public ProjectionBlock(Map dataSourceMap, DataBlockCache dat _dataBlockCache = dataBlockCache; } + @Override public int getNumDocs() { return _dataBlockCache.getNumDocs(); } + @Override public int[] getDocIds() { return _dataBlockCache.getDocIds(); } + @Override + public BlockValSet getBlockValueSet(ExpressionContext expression) { + assert expression.getType() == ExpressionContext.Type.IDENTIFIER; + return getBlockValueSet(expression.getIdentifier()); + } + + @Override public BlockValSet getBlockValueSet(String column) { return new ProjectionBlockValSet(_dataBlockCache, column, _dataSourceMap.get(column)); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TransformBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TransformBlock.java index b0b5ae9e0f46..9c32bbf8a827 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TransformBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TransformBlock.java @@ -19,44 +19,47 @@ package org.apache.pinot.core.operator.blocks; import java.util.Map; +import javax.annotation.Nullable; import org.apache.pinot.common.request.context.ExpressionContext; -import org.apache.pinot.core.common.Block; import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.operator.docvalsets.TransformBlockValSet; import org.apache.pinot.core.operator.transform.function.TransformFunction; /** - * Transform Block holds blocks of transformed columns. - *

In absence of transforms, it servers as a pass-through to projection block. + * The {@code TransformBlock} contains values of the transformed columns. */ -public class TransformBlock implements Block { - protected final ProjectionBlock _projectionBlock; +public class TransformBlock implements ValueBlock { + protected final ValueBlock _sourceBlock; protected final Map _transformFunctionMap; - public TransformBlock(ProjectionBlock projectionBlock, - Map transformFunctionMap) { - _projectionBlock = projectionBlock; + public TransformBlock(ValueBlock sourceBlock, Map transformFunctionMap) { + _sourceBlock = sourceBlock; _transformFunctionMap = transformFunctionMap; } + @Override public int getNumDocs() { - return _projectionBlock.getNumDocs(); + return _sourceBlock.getNumDocs(); } + @Nullable + @Override public int[] getDocIds() { - return _projectionBlock.getDocIds(); + return _sourceBlock.getDocIds(); } + @Override public BlockValSet getBlockValueSet(ExpressionContext expression) { if (expression.getType() == ExpressionContext.Type.IDENTIFIER) { - return _projectionBlock.getBlockValueSet(expression.getIdentifier()); + return _sourceBlock.getBlockValueSet(expression); } else { - return new TransformBlockValSet(_projectionBlock, _transformFunctionMap.get(expression), expression); + return new TransformBlockValSet(_sourceBlock, _transformFunctionMap.get(expression), expression); } } + @Override public BlockValSet getBlockValueSet(String column) { - return _projectionBlock.getBlockValueSet(column); + return _sourceBlock.getBlockValueSet(column); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/PassThroughTransformBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ValueBlock.java similarity index 59% rename from pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/PassThroughTransformBlock.java rename to pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ValueBlock.java index c9f7d0d7e198..a135c1edf096 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/PassThroughTransformBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ValueBlock.java @@ -18,24 +18,35 @@ */ package org.apache.pinot.core.operator.blocks; -import java.util.Map; +import javax.annotation.Nullable; import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.core.common.Block; import org.apache.pinot.core.common.BlockValSet; -import org.apache.pinot.core.operator.transform.function.TransformFunction; /** - * Transform Block servers as a pass-through to projection block. + * The {@code ValueBlock} contains a block of values for multiple expressions. */ -public class PassThroughTransformBlock extends TransformBlock { +public interface ValueBlock extends Block { - public PassThroughTransformBlock(ProjectionBlock projectionBlock, - Map transformFunctionMap) { - super(projectionBlock, transformFunctionMap); - } + /** + * Returns the number of documents within the block. + */ + int getNumDocs(); - @Override - public BlockValSet getBlockValueSet(ExpressionContext expression) { - return _projectionBlock.getBlockValueSet(expression.getIdentifier()); - } + /** + * Returns the document ids from the segment, or {@code null} if it is not available. + */ + @Nullable + int[] getDocIds(); + + /** + * Returns the values for a given expression. + */ + BlockValSet getBlockValueSet(ExpressionContext expression); + + /** + * Returns the values for a given column (identifier). + */ + BlockValSet getBlockValueSet(String column); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java index 97a767cf19cd..7f4d3958841a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java @@ -24,7 +24,7 @@ import javax.annotation.Nullable; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.core.common.BlockValSet; -import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.transform.TransformResultMetadata; import org.apache.pinot.core.operator.transform.function.TransformFunction; import org.apache.pinot.core.plan.DocIdSetPlanNode; @@ -42,7 +42,7 @@ *

Caller is responsible for calling the correct method based on the data source metadata for the block value set. */ public class TransformBlockValSet implements BlockValSet { - private final ProjectionBlock _projectionBlock; + private final ValueBlock _valueBlock; private final TransformFunction _transformFunction; private final ExpressionContext _expression; @@ -51,9 +51,9 @@ public class TransformBlockValSet implements BlockValSet { private int[] _numMVEntries; - public TransformBlockValSet(ProjectionBlock projectionBlock, TransformFunction transformFunction, + public TransformBlockValSet(ValueBlock valueBlock, TransformFunction transformFunction, ExpressionContext expression) { - _projectionBlock = projectionBlock; + _valueBlock = valueBlock; _transformFunction = transformFunction; _expression = expression; } @@ -67,7 +67,7 @@ public RoaringBitmap getNullBitmap() { Set columns = new HashSet<>(); _expression.getFunction().getColumns(columns); for (String column : columns) { - BlockValSet blockValSet = _projectionBlock.getBlockValueSet(column); + BlockValSet blockValSet = _valueBlock.getBlockValueSet(column); RoaringBitmap columnNullBitmap = blockValSet.getNullBitmap(); if (columnNullBitmap != null) { if (nullBitmap == null) { @@ -110,7 +110,7 @@ public Dictionary getDictionary() { public int[] getDictionaryIdsSV() { try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { recordTransformValues(scope, DataType.INT, true); - return _transformFunction.transformToDictIdsSV(_projectionBlock); + return _transformFunction.transformToDictIdsSV(_valueBlock); } } @@ -118,7 +118,7 @@ public int[] getDictionaryIdsSV() { public int[] getIntValuesSV() { try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { recordTransformValues(scope, DataType.INT, true); - return _transformFunction.transformToIntValuesSV(_projectionBlock); + return _transformFunction.transformToIntValuesSV(_valueBlock); } } @@ -126,7 +126,7 @@ public int[] getIntValuesSV() { public long[] getLongValuesSV() { try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { recordTransformValues(scope, DataType.LONG, true); - return _transformFunction.transformToLongValuesSV(_projectionBlock); + return _transformFunction.transformToLongValuesSV(_valueBlock); } } @@ -134,7 +134,7 @@ public long[] getLongValuesSV() { public float[] getFloatValuesSV() { try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { recordTransformValues(scope, DataType.FLOAT, true); - return _transformFunction.transformToFloatValuesSV(_projectionBlock); + return _transformFunction.transformToFloatValuesSV(_valueBlock); } } @@ -142,7 +142,7 @@ public float[] getFloatValuesSV() { public double[] getDoubleValuesSV() { try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { recordTransformValues(scope, DataType.DOUBLE, true); - return _transformFunction.transformToDoubleValuesSV(_projectionBlock); + return _transformFunction.transformToDoubleValuesSV(_valueBlock); } } @@ -150,7 +150,7 @@ public double[] getDoubleValuesSV() { public BigDecimal[] getBigDecimalValuesSV() { try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { recordTransformValues(scope, DataType.BIG_DECIMAL, true); - return _transformFunction.transformToBigDecimalValuesSV(_projectionBlock); + return _transformFunction.transformToBigDecimalValuesSV(_valueBlock); } } @@ -158,7 +158,7 @@ public BigDecimal[] getBigDecimalValuesSV() { public String[] getStringValuesSV() { try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { recordTransformValues(scope, DataType.STRING, true); - return _transformFunction.transformToStringValuesSV(_projectionBlock); + return _transformFunction.transformToStringValuesSV(_valueBlock); } } @@ -166,7 +166,7 @@ public String[] getStringValuesSV() { public byte[][] getBytesValuesSV() { try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { recordTransformValues(scope, DataType.BYTES, true); - return _transformFunction.transformToBytesValuesSV(_projectionBlock); + return _transformFunction.transformToBytesValuesSV(_valueBlock); } } @@ -174,7 +174,7 @@ public byte[][] getBytesValuesSV() { public int[][] getDictionaryIdsMV() { try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { recordTransformValues(scope, DataType.INT, false); - return _transformFunction.transformToDictIdsMV(_projectionBlock); + return _transformFunction.transformToDictIdsMV(_valueBlock); } } @@ -182,7 +182,7 @@ public int[][] getDictionaryIdsMV() { public int[][] getIntValuesMV() { try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { recordTransformValues(scope, DataType.INT, false); - return _transformFunction.transformToIntValuesMV(_projectionBlock); + return _transformFunction.transformToIntValuesMV(_valueBlock); } } @@ -190,7 +190,7 @@ public int[][] getIntValuesMV() { public long[][] getLongValuesMV() { try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { recordTransformValues(scope, DataType.LONG, false); - return _transformFunction.transformToLongValuesMV(_projectionBlock); + return _transformFunction.transformToLongValuesMV(_valueBlock); } } @@ -198,7 +198,7 @@ public long[][] getLongValuesMV() { public float[][] getFloatValuesMV() { try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { recordTransformValues(scope, DataType.FLOAT, false); - return _transformFunction.transformToFloatValuesMV(_projectionBlock); + return _transformFunction.transformToFloatValuesMV(_valueBlock); } } @@ -206,7 +206,7 @@ public float[][] getFloatValuesMV() { public double[][] getDoubleValuesMV() { try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { recordTransformValues(scope, DataType.DOUBLE, false); - return _transformFunction.transformToDoubleValuesMV(_projectionBlock); + return _transformFunction.transformToDoubleValuesMV(_valueBlock); } } @@ -214,7 +214,7 @@ public double[][] getDoubleValuesMV() { public String[][] getStringValuesMV() { try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { recordTransformValues(scope, DataType.STRING, false); - return _transformFunction.transformToStringValuesMV(_projectionBlock); + return _transformFunction.transformToStringValuesMV(_valueBlock); } } @@ -222,7 +222,7 @@ public String[][] getStringValuesMV() { public byte[][][] getBytesValuesMV() { try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { recordTransformValues(scope, DataType.BYTES, false); - return _transformFunction.transformToBytesValuesMV(_projectionBlock); + return _transformFunction.transformToBytesValuesMV(_valueBlock); } } @@ -231,7 +231,7 @@ public int[] getNumMVEntries() { if (_numMVEntries == null) { _numMVEntries = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL]; } - int numDocs = _projectionBlock.getNumDocs(); + int numDocs = _valueBlock.getNumDocs(); TransformResultMetadata resultMetadata = _transformFunction.getResultMetadata(); if (resultMetadata.hasDictionary()) { int[][] dictionaryIds = getDictionaryIdsMV(); @@ -279,7 +279,7 @@ public int[] getNumMVEntries() { private void recordTransformValues(InvocationRecording recording, DataType dataType, boolean singleValue) { if (recording.isEnabled()) { - int numDocs = _projectionBlock.getNumDocs(); + int numDocs = _valueBlock.getNumDocs(); recording.setNumDocsScanned(numDocs); recording.setFunctionName(_transformFunction.getName()); recording.setOutputDataType(dataType, singleValue); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java index 50e8ebb7e749..9001e48fe24a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java @@ -26,7 +26,9 @@ import java.util.Set; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.predicate.Predicate; +import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.ColumnContext; import org.apache.pinot.core.operator.blocks.FilterBlock; import org.apache.pinot.core.operator.docidsets.ExpressionDocIdSet; import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; @@ -49,15 +51,18 @@ public class ExpressionFilterOperator extends BaseFilterOperator { public ExpressionFilterOperator(IndexSegment segment, QueryContext queryContext, Predicate predicate, int numDocs) { _numDocs = numDocs; - _dataSourceMap = new HashMap<>(); Set columns = new HashSet<>(); ExpressionContext lhs = predicate.getLhs(); lhs.getColumns(columns); - for (String column : columns) { - _dataSourceMap.put(column, segment.getDataSource(column)); - } - - _transformFunction = TransformFunctionFactory.get(lhs, _dataSourceMap, queryContext); + int mapCapacity = HashUtil.getHashMapCapacity(columns.size()); + _dataSourceMap = new HashMap<>(mapCapacity); + Map columnContextMap = new HashMap<>(mapCapacity); + columns.forEach(column -> { + DataSource dataSource = segment.getDataSource(column); + _dataSourceMap.put(column, dataSource); + columnContextMap.put(column, ColumnContext.fromDataSource(dataSource)); + }); + _transformFunction = TransformFunctionFactory.get(lhs, columnContextMap, queryContext); _predicateEvaluator = PredicateEvaluatorProvider.getPredicateEvaluator(predicate, _transformFunction.getDictionary(), _transformFunction.getResultMetadata().getDataType()); @@ -69,7 +74,7 @@ protected FilterBlock getNextBlock() { } @Override - public List getChildOperators() { + public List> getChildOperators() { return Collections.emptyList(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java index 97d81b61cfa4..01d743b26990 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java @@ -20,12 +20,11 @@ import java.util.Collections; import java.util.List; -import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.BaseProjectOperator; import org.apache.pinot.core.operator.ExecutionStatistics; -import org.apache.pinot.core.operator.blocks.TransformBlock; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; -import org.apache.pinot.core.operator.transform.TransformOperator; import org.apache.pinot.core.query.aggregation.AggregationExecutor; import org.apache.pinot.core.query.aggregation.DefaultAggregationExecutor; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; @@ -40,16 +39,16 @@ public class AggregationOperator extends BaseOperator { private static final String EXPLAIN_NAME = "AGGREGATE"; private final AggregationFunction[] _aggregationFunctions; - private final TransformOperator _transformOperator; + private final BaseProjectOperator _projectOperator; private final long _numTotalDocs; private final boolean _useStarTree; private int _numDocsScanned = 0; - public AggregationOperator(AggregationFunction[] aggregationFunctions, TransformOperator transformOperator, + public AggregationOperator(AggregationFunction[] aggregationFunctions, BaseProjectOperator projectOperator, long numTotalDocs, boolean useStarTree) { _aggregationFunctions = aggregationFunctions; - _transformOperator = transformOperator; + _projectOperator = projectOperator; _numTotalDocs = numTotalDocs; _useStarTree = useStarTree; } @@ -63,10 +62,10 @@ protected AggregationResultsBlock getNextBlock() { } else { aggregationExecutor = new DefaultAggregationExecutor(_aggregationFunctions); } - TransformBlock transformBlock; - while ((transformBlock = _transformOperator.nextBlock()) != null) { - _numDocsScanned += transformBlock.getNumDocs(); - aggregationExecutor.aggregate(transformBlock); + ValueBlock valueBlock; + while ((valueBlock = _projectOperator.nextBlock()) != null) { + _numDocsScanned += valueBlock.getNumDocs(); + aggregationExecutor.aggregate(valueBlock); } // Build intermediate result block based on aggregation result from the executor @@ -74,14 +73,14 @@ protected AggregationResultsBlock getNextBlock() { } @Override - public List getChildOperators() { - return Collections.singletonList(_transformOperator); + public List> getChildOperators() { + return Collections.singletonList(_projectOperator); } @Override public ExecutionStatistics getExecutionStatistics() { - long numEntriesScannedInFilter = _transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); - long numEntriesScannedPostFilter = (long) _numDocsScanned * _transformOperator.getNumColumnsProjected(); + long numEntriesScannedInFilter = _projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); + long numEntriesScannedPostFilter = (long) _numDocsScanned * _projectOperator.getNumColumnsProjected(); return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, _numTotalDocs); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java index 9ba1a65e86b6..30e135223ff2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java @@ -20,12 +20,11 @@ import java.util.Collections; import java.util.List; -import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.BaseProjectOperator; import org.apache.pinot.core.operator.ExecutionStatistics; -import org.apache.pinot.core.operator.blocks.TransformBlock; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock; -import org.apache.pinot.core.operator.transform.TransformOperator; import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction; import org.apache.pinot.core.query.distinct.DistinctExecutor; import org.apache.pinot.core.query.distinct.DistinctExecutorFactory; @@ -41,26 +40,26 @@ public class DistinctOperator extends BaseOperator { private final IndexSegment _indexSegment; private final DistinctAggregationFunction _distinctAggregationFunction; - private final TransformOperator _transformOperator; + private final BaseProjectOperator _projectOperator; private final DistinctExecutor _distinctExecutor; private int _numDocsScanned = 0; public DistinctOperator(IndexSegment indexSegment, DistinctAggregationFunction distinctAggregationFunction, - TransformOperator transformOperator, QueryContext queryContext) { + BaseProjectOperator projectOperator, QueryContext queryContext) { _indexSegment = indexSegment; _distinctAggregationFunction = distinctAggregationFunction; - _transformOperator = transformOperator; - _distinctExecutor = DistinctExecutorFactory.getDistinctExecutor(distinctAggregationFunction, transformOperator, + _projectOperator = projectOperator; + _distinctExecutor = DistinctExecutorFactory.getDistinctExecutor(distinctAggregationFunction, projectOperator, queryContext.isNullHandlingEnabled()); } @Override protected DistinctResultsBlock getNextBlock() { - TransformBlock transformBlock; - while ((transformBlock = _transformOperator.nextBlock()) != null) { - _numDocsScanned += transformBlock.getNumDocs(); - if (_distinctExecutor.process(transformBlock)) { + ValueBlock valueBlock; + while ((valueBlock = _projectOperator.nextBlock()) != null) { + _numDocsScanned += valueBlock.getNumDocs(); + if (_distinctExecutor.process(valueBlock)) { break; } } @@ -68,8 +67,8 @@ protected DistinctResultsBlock getNextBlock() { } @Override - public List getChildOperators() { - return Collections.singletonList(_transformOperator); + public List> getChildOperators() { + return Collections.singletonList(_projectOperator); } @Override @@ -79,8 +78,8 @@ public IndexSegment getIndexSegment() { @Override public ExecutionStatistics getExecutionStatistics() { - long numEntriesScannedInFilter = _transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); - long numEntriesScannedPostFilter = (long) _numDocsScanned * _transformOperator.getNumColumnsProjected(); + long numEntriesScannedInFilter = _projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); + long numEntriesScannedPostFilter = (long) _numDocsScanned * _projectOperator.getNumColumnsProjected(); int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs(); return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, numTotalDocs); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/EmptySelectionOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/EmptySelectionOperator.java index f2bb79b384c5..fa11d3dc39c2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/EmptySelectionOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/EmptySelectionOperator.java @@ -22,12 +22,11 @@ import java.util.List; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.BaseProjectOperator; +import org.apache.pinot.core.operator.ColumnContext; import org.apache.pinot.core.operator.ExecutionStatistics; import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; -import org.apache.pinot.core.operator.transform.TransformOperator; -import org.apache.pinot.core.operator.transform.TransformResultMetadata; import org.apache.pinot.segment.spi.IndexSegment; @@ -37,25 +36,25 @@ *

NOTE: this operator short circuit underlying operators and directly returns the data schema without any rows. */ public class EmptySelectionOperator extends BaseOperator { - private static final String EXPLAIN_NAME = "SELECT_EMPTY"; + private final BaseProjectOperator _projectOperator; private final DataSchema _dataSchema; private final ExecutionStatistics _executionStatistics; - private final TransformOperator _transformOperator; public EmptySelectionOperator(IndexSegment indexSegment, List expressions, - TransformOperator transformOperator) { + BaseProjectOperator projectOperator) { + _projectOperator = projectOperator; + int numExpressions = expressions.size(); String[] columnNames = new String[numExpressions]; - _transformOperator = transformOperator; DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numExpressions]; for (int i = 0; i < numExpressions; i++) { ExpressionContext expression = expressions.get(i); - TransformResultMetadata expressionMetadata = _transformOperator.getResultMetadata(expression); columnNames[i] = expression.toString(); + ColumnContext columnContext = projectOperator.getResultColumnContext(expression); columnDataTypes[i] = - DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), expressionMetadata.isSingleValue()); + DataSchema.ColumnDataType.fromDataType(columnContext.getDataType(), columnContext.isSingleValue()); } _dataSchema = new DataSchema(columnNames, columnDataTypes); @@ -73,8 +72,8 @@ public String toExplainString() { } @Override - public List getChildOperators() { - return Collections.singletonList(_transformOperator); + public List> getChildOperators() { + return Collections.singletonList(_projectOperator); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java index f478f36d921b..1237013fb153 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java @@ -25,10 +25,10 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.BaseProjectOperator; import org.apache.pinot.core.operator.ExecutionStatistics; -import org.apache.pinot.core.operator.blocks.TransformBlock; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; -import org.apache.pinot.core.operator.transform.TransformOperator; import org.apache.pinot.core.query.aggregation.AggregationExecutor; import org.apache.pinot.core.query.aggregation.DefaultAggregationExecutor; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; @@ -45,19 +45,17 @@ public class FilteredAggregationOperator extends BaseOperator> _aggFunctionsWithTransformOperator; + private final List>> _projectOperators; private final long _numTotalDocs; private long _numDocsScanned; private long _numEntriesScannedInFilter; private long _numEntriesScannedPostFilter; - // We can potentially do away with aggregationFunctions parameter, but its cleaner to pass it in than to construct - // it from aggFunctionsWithTransformOperator public FilteredAggregationOperator(AggregationFunction[] aggregationFunctions, - List> aggFunctionsWithTransformOperator, long numTotalDocs) { + List>> projectOperators, long numTotalDocs) { _aggregationFunctions = aggregationFunctions; - _aggFunctionsWithTransformOperator = aggFunctionsWithTransformOperator; + _projectOperators = projectOperators; _numTotalDocs = numTotalDocs; } @@ -70,15 +68,15 @@ protected AggregationResultsBlock getNextBlock() { resultIndexMap.put(_aggregationFunctions[i], i); } - for (Pair filteredAggregation : _aggFunctionsWithTransformOperator) { - AggregationFunction[] aggregationFunctions = filteredAggregation.getLeft(); + for (Pair> pair : _projectOperators) { + AggregationFunction[] aggregationFunctions = pair.getLeft(); AggregationExecutor aggregationExecutor = new DefaultAggregationExecutor(aggregationFunctions); - TransformOperator transformOperator = filteredAggregation.getRight(); - TransformBlock transformBlock; + BaseProjectOperator projectOperator = pair.getRight(); + ValueBlock valueBlock; int numDocsScanned = 0; - while ((transformBlock = transformOperator.nextBlock()) != null) { - aggregationExecutor.aggregate(transformBlock); - numDocsScanned += transformBlock.getNumDocs(); + while ((valueBlock = projectOperator.nextBlock()) != null) { + aggregationExecutor.aggregate(valueBlock); + numDocsScanned += valueBlock.getNumDocs(); } List filteredResult = aggregationExecutor.getResult(); @@ -86,15 +84,15 @@ protected AggregationResultsBlock getNextBlock() { result[resultIndexMap.get(aggregationFunctions[i])] = filteredResult.get(i); } _numDocsScanned += numDocsScanned; - _numEntriesScannedInFilter += transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); - _numEntriesScannedPostFilter += (long) numDocsScanned * transformOperator.getNumColumnsProjected(); + _numEntriesScannedInFilter += projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); + _numEntriesScannedPostFilter += (long) numDocsScanned * projectOperator.getNumColumnsProjected(); } return new AggregationResultsBlock(_aggregationFunctions, Arrays.asList(result)); } @Override public List getChildOperators() { - return _aggFunctionsWithTransformOperator.stream().map(Pair::getRight).collect(Collectors.toList()); + return _projectOperators.stream().map(Pair::getRight).collect(Collectors.toList()); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java index 7103f47f56ca..9be251ae23e7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java @@ -30,10 +30,10 @@ import org.apache.pinot.core.data.table.IntermediateRecord; import org.apache.pinot.core.data.table.TableResizer; import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.BaseProjectOperator; import org.apache.pinot.core.operator.ExecutionStatistics; -import org.apache.pinot.core.operator.blocks.TransformBlock; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; -import org.apache.pinot.core.operator.transform.TransformOperator; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; @@ -56,7 +56,7 @@ public class FilteredGroupByOperator extends BaseOperator { private final QueryContext _queryContext; private final AggregationFunction[] _aggregationFunctions; private final ExpressionContext[] _groupByExpressions; - private final List> _aggFunctionsWithTransformOperator; + private final List>> _projectOperators; private final long _numTotalDocs; private final DataSchema _dataSchema; @@ -65,13 +65,13 @@ public class FilteredGroupByOperator extends BaseOperator { private long _numEntriesScannedPostFilter; public FilteredGroupByOperator(QueryContext queryContext, - List> aggFunctionsWithTransformOperator, long numTotalDocs) { + List>> projectOperators, long numTotalDocs) { assert queryContext.getAggregationFunctions() != null && queryContext.getFilteredAggregationFunctions() != null && queryContext.getGroupByExpressions() != null; _queryContext = queryContext; _aggregationFunctions = queryContext.getAggregationFunctions(); _groupByExpressions = queryContext.getGroupByExpressions().toArray(new ExpressionContext[0]); - _aggFunctionsWithTransformOperator = aggFunctionsWithTransformOperator; + _projectOperators = projectOperators; _numTotalDocs = numTotalDocs; // NOTE: The indexedTable expects that the data schema will have group by columns before aggregation columns @@ -86,7 +86,7 @@ public FilteredGroupByOperator(QueryContext queryContext, ExpressionContext groupByExpression = _groupByExpressions[i]; columnNames[i] = groupByExpression.toString(); columnDataTypes[i] = DataSchema.ColumnDataType.fromDataTypeSV( - aggFunctionsWithTransformOperator.get(i).getRight().getResultMetadata(groupByExpression).getDataType()); + projectOperators.get(i).getRight().getResultColumnContext(groupByExpression).getDataType()); } // Extract column names and data types for aggregation functions @@ -115,9 +115,9 @@ protected GroupByResultsBlock getNextBlock() { } GroupKeyGenerator groupKeyGenerator = null; - for (Pair filteredAggregation : _aggFunctionsWithTransformOperator) { - TransformOperator transformOperator = filteredAggregation.getRight(); - AggregationFunction[] filteredAggFunctions = filteredAggregation.getLeft(); + for (Pair> pair : _projectOperators) { + AggregationFunction[] aggregationFunctions = pair.getLeft(); + BaseProjectOperator projectOperator = pair.getRight(); // Perform aggregation group-by on all the blocks DefaultGroupByExecutor groupByExecutor; @@ -130,27 +130,27 @@ protected GroupByResultsBlock getNextBlock() { // GroupByExecutor with a pre-existing GroupKeyGenerator so that the GroupKeyGenerator can be shared across // loop iterations i.e. across all aggs. groupByExecutor = - new DefaultGroupByExecutor(_queryContext, filteredAggFunctions, _groupByExpressions, transformOperator); + new DefaultGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator); groupKeyGenerator = groupByExecutor.getGroupKeyGenerator(); } else { groupByExecutor = - new DefaultGroupByExecutor(_queryContext, filteredAggFunctions, _groupByExpressions, transformOperator, + new DefaultGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator, groupKeyGenerator); } int numDocsScanned = 0; - TransformBlock transformBlock; - while ((transformBlock = transformOperator.nextBlock()) != null) { - numDocsScanned += transformBlock.getNumDocs(); - groupByExecutor.process(transformBlock); + ValueBlock valueBlock; + while ((valueBlock = projectOperator.nextBlock()) != null) { + numDocsScanned += valueBlock.getNumDocs(); + groupByExecutor.process(valueBlock); } _numDocsScanned += numDocsScanned; - _numEntriesScannedInFilter += transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); - _numEntriesScannedPostFilter += (long) numDocsScanned * transformOperator.getNumColumnsProjected(); + _numEntriesScannedInFilter += projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); + _numEntriesScannedPostFilter += (long) numDocsScanned * projectOperator.getNumColumnsProjected(); GroupByResultHolder[] filterGroupByResults = groupByExecutor.getGroupByResultHolders(); - for (int i = 0; i < filteredAggFunctions.length; i++) { - groupByResultHolders[resultHolderIndexMap.get(filteredAggFunctions[i])] = filterGroupByResults[i]; + for (int i = 0; i < aggregationFunctions.length; i++) { + groupByResultHolders[resultHolderIndexMap.get(aggregationFunctions[i])] = filterGroupByResults[i]; } } assert groupKeyGenerator != null; @@ -190,7 +190,7 @@ protected GroupByResultsBlock getNextBlock() { @Override public List getChildOperators() { - return _aggFunctionsWithTransformOperator.stream().map(Pair::getRight).collect(Collectors.toList()); + return _projectOperators.stream().map(Pair::getRight).collect(Collectors.toList()); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java index 463cc60d4410..3465f653e9f8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java @@ -27,10 +27,10 @@ import org.apache.pinot.core.data.table.IntermediateRecord; import org.apache.pinot.core.data.table.TableResizer; import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.BaseProjectOperator; import org.apache.pinot.core.operator.ExecutionStatistics; -import org.apache.pinot.core.operator.blocks.TransformBlock; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; -import org.apache.pinot.core.operator.transform.TransformOperator; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor; import org.apache.pinot.core.query.aggregation.groupby.GroupByExecutor; @@ -49,7 +49,7 @@ public class GroupByOperator extends BaseOperator { private final AggregationFunction[] _aggregationFunctions; private final ExpressionContext[] _groupByExpressions; - private final TransformOperator _transformOperator; + private final BaseProjectOperator _projectOperator; private final long _numTotalDocs; private final boolean _useStarTree; private final DataSchema _dataSchema; @@ -58,10 +58,10 @@ public class GroupByOperator extends BaseOperator { private int _numDocsScanned = 0; public GroupByOperator(AggregationFunction[] aggregationFunctions, ExpressionContext[] groupByExpressions, - TransformOperator transformOperator, long numTotalDocs, QueryContext queryContext, boolean useStarTree) { + BaseProjectOperator projectOperator, long numTotalDocs, QueryContext queryContext, boolean useStarTree) { _aggregationFunctions = aggregationFunctions; _groupByExpressions = groupByExpressions; - _transformOperator = transformOperator; + _projectOperator = projectOperator; _numTotalDocs = numTotalDocs; _useStarTree = useStarTree; _queryContext = queryContext; @@ -78,7 +78,7 @@ public GroupByOperator(AggregationFunction[] aggregationFunctions, ExpressionCon ExpressionContext groupByExpression = groupByExpressions[i]; columnNames[i] = groupByExpression.toString(); columnDataTypes[i] = DataSchema.ColumnDataType.fromDataTypeSV( - _transformOperator.getResultMetadata(groupByExpression).getDataType()); + _projectOperator.getResultColumnContext(groupByExpression).getDataType()); } // Extract column names and data types for aggregation functions @@ -97,14 +97,14 @@ protected GroupByResultsBlock getNextBlock() { // Perform aggregation group-by on all the blocks GroupByExecutor groupByExecutor; if (_useStarTree) { - groupByExecutor = new StarTreeGroupByExecutor(_queryContext, _groupByExpressions, _transformOperator); + groupByExecutor = new StarTreeGroupByExecutor(_queryContext, _groupByExpressions, _projectOperator); } else { - groupByExecutor = new DefaultGroupByExecutor(_queryContext, _groupByExpressions, _transformOperator); + groupByExecutor = new DefaultGroupByExecutor(_queryContext, _groupByExpressions, _projectOperator); } - TransformBlock transformBlock; - while ((transformBlock = _transformOperator.nextBlock()) != null) { - _numDocsScanned += transformBlock.getNumDocs(); - groupByExecutor.process(transformBlock); + ValueBlock valueBlock; + while ((valueBlock = _projectOperator.nextBlock()) != null) { + _numDocsScanned += valueBlock.getNumDocs(); + groupByExecutor.process(valueBlock); } // Check if the groups limit is reached @@ -136,13 +136,13 @@ protected GroupByResultsBlock getNextBlock() { @Override public List getChildOperators() { - return Collections.singletonList(_transformOperator); + return Collections.singletonList(_projectOperator); } @Override public ExecutionStatistics getExecutionStatistics() { - long numEntriesScannedInFilter = _transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); - long numEntriesScannedPostFilter = (long) _numDocsScanned * _transformOperator.getNumColumnsProjected(); + long numEntriesScannedInFilter = _projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); + long numEntriesScannedPostFilter = (long) _numDocsScanned * _projectOperator.getNumColumnsProjected(); return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, _numTotalDocs); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java index c86e5a114d15..b0904508a109 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java @@ -32,14 +32,13 @@ import org.apache.pinot.common.request.context.OrderByExpressionContext; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.BlockValSet; -import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.common.RowBasedBlockValueFetcher; import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.BaseProjectOperator; +import org.apache.pinot.core.operator.ColumnContext; import org.apache.pinot.core.operator.ExecutionStatistics; -import org.apache.pinot.core.operator.blocks.TransformBlock; +import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; -import org.apache.pinot.core.operator.transform.TransformOperator; -import org.apache.pinot.core.operator.transform.TransformResultMetadata; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.selection.SelectionOperatorUtils; import org.apache.pinot.core.query.utils.OrderByComparatorFactory; @@ -70,23 +69,23 @@ public abstract class LinearSelectionOrderByOperator extends BaseOperator