Skip to content

Commit

Permalink
Introduce BaseProjectOperator and ValueBlock (#10405)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Mar 14, 2023
1 parent 3bad67d commit 01ff18d
Show file tree
Hide file tree
Showing 130 changed files with 2,932 additions and 2,928 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.ColumnContext;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.operator.transform.function.BaseTransformFunction;
import org.apache.pinot.core.operator.transform.function.LiteralTransformFunction;
import org.apache.pinot.core.operator.transform.function.TransformFunction;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.utils.BytesUtils;
import org.locationtech.jts.geom.Geometry;
Expand All @@ -46,17 +46,15 @@ public abstract class BaseBinaryGeoTransformFunction extends BaseTransformFuncti
private double[] _doubleResults;

@Override
public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
Preconditions
.checkArgument(arguments.size() == 2, "2 arguments are required for transform function: %s", getName());
public void init(List<TransformFunction> arguments, Map<String, ColumnContext> columnContextMap) {
Preconditions.checkArgument(arguments.size() == 2, "2 arguments are required for transform function: %s",
getName());
TransformFunction transformFunction = arguments.get(0);
Preconditions.checkArgument(transformFunction.getResultMetadata().isSingleValue(),
"First argument must be single-valued for transform function: %s", getName());
Preconditions.checkArgument(transformFunction.getResultMetadata().getDataType() == FieldSpec.DataType.BYTES
|| transformFunction instanceof LiteralTransformFunction,
"The first argument must be of type BYTES , but was %s",
transformFunction.getResultMetadata().getDataType()
);
"The first argument must be of type BYTES , but was %s", transformFunction.getResultMetadata().getDataType());
if (transformFunction instanceof LiteralTransformFunction) {
_firstLiteral = GeometrySerializer.deserialize(
BytesUtils.toBytes(((LiteralTransformFunction) transformFunction).getLiteral()));
Expand All @@ -68,9 +66,7 @@ public void init(List<TransformFunction> arguments, Map<String, DataSource> data
"Second argument must be single-valued for transform function: %s", getName());
Preconditions.checkArgument(transformFunction.getResultMetadata().getDataType() == FieldSpec.DataType.BYTES
|| transformFunction instanceof LiteralTransformFunction,
"The second argument must be of type BYTES , but was %s",
transformFunction.getResultMetadata().getDataType()
);
"The second argument must be of type BYTES , but was %s", transformFunction.getResultMetadata().getDataType());
if (transformFunction instanceof LiteralTransformFunction) {
_secondLiteral = GeometrySerializer.deserialize(
BytesUtils.toBytes(((LiteralTransformFunction) transformFunction).getLiteral()));
Expand All @@ -79,59 +75,59 @@ public void init(List<TransformFunction> arguments, Map<String, DataSource> data
}
}

protected int[] transformGeometryToIntValuesSV(ProjectionBlock projectionBlock) {
protected int[] transformGeometryToIntValuesSV(ValueBlock valueBlock) {
if (_intResults == null) {
_intResults = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
}
byte[][] firstValues;
byte[][] secondValues;
if (_firstArgument == null && _secondArgument == null) {
_intResults = new int[Math.min(projectionBlock.getNumDocs(), DocIdSetPlanNode.MAX_DOC_PER_CALL)];
_intResults = new int[Math.min(valueBlock.getNumDocs(), DocIdSetPlanNode.MAX_DOC_PER_CALL)];
Arrays.fill(_intResults, transformGeometryToInt(_firstLiteral, _secondLiteral));
} else if (_firstArgument == null) {
secondValues = _secondArgument.transformToBytesValuesSV(projectionBlock);
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
secondValues = _secondArgument.transformToBytesValuesSV(valueBlock);
for (int i = 0; i < valueBlock.getNumDocs(); i++) {
_intResults[i] = transformGeometryToInt(_firstLiteral, GeometrySerializer.deserialize(secondValues[i]));
}
} else if (_secondArgument == null) {
firstValues = _firstArgument.transformToBytesValuesSV(projectionBlock);
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
firstValues = _firstArgument.transformToBytesValuesSV(valueBlock);
for (int i = 0; i < valueBlock.getNumDocs(); i++) {
_intResults[i] = transformGeometryToInt(GeometrySerializer.deserialize(firstValues[i]), _secondLiteral);
}
} else {
firstValues = _firstArgument.transformToBytesValuesSV(projectionBlock);
secondValues = _secondArgument.transformToBytesValuesSV(projectionBlock);
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
firstValues = _firstArgument.transformToBytesValuesSV(valueBlock);
secondValues = _secondArgument.transformToBytesValuesSV(valueBlock);
for (int i = 0; i < valueBlock.getNumDocs(); i++) {
_intResults[i] = transformGeometryToInt(GeometrySerializer.deserialize(firstValues[i]),
GeometrySerializer.deserialize(secondValues[i]));
}
}
return _intResults;
}

protected double[] transformGeometryToDoubleValuesSV(ProjectionBlock projectionBlock) {
protected double[] transformGeometryToDoubleValuesSV(ValueBlock valueBlock) {
if (_doubleResults == null) {
_doubleResults = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL];
}
byte[][] firstValues;
byte[][] secondValues;
if (_firstArgument == null && _secondArgument == null) {
_doubleResults = new double[Math.min(projectionBlock.getNumDocs(), DocIdSetPlanNode.MAX_DOC_PER_CALL)];
_doubleResults = new double[Math.min(valueBlock.getNumDocs(), DocIdSetPlanNode.MAX_DOC_PER_CALL)];
Arrays.fill(_doubleResults, transformGeometryToDouble(_firstLiteral, _secondLiteral));
} else if (_firstArgument == null) {
secondValues = _secondArgument.transformToBytesValuesSV(projectionBlock);
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
secondValues = _secondArgument.transformToBytesValuesSV(valueBlock);
for (int i = 0; i < valueBlock.getNumDocs(); i++) {
_doubleResults[i] = transformGeometryToDouble(_firstLiteral, GeometrySerializer.deserialize(secondValues[i]));
}
} else if (_secondArgument == null) {
firstValues = _firstArgument.transformToBytesValuesSV(projectionBlock);
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
firstValues = _firstArgument.transformToBytesValuesSV(valueBlock);
for (int i = 0; i < valueBlock.getNumDocs(); i++) {
_doubleResults[i] = transformGeometryToDouble(GeometrySerializer.deserialize(firstValues[i]), _secondLiteral);
}
} else {
firstValues = _firstArgument.transformToBytesValuesSV(projectionBlock);
secondValues = _secondArgument.transformToBytesValuesSV(projectionBlock);
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
firstValues = _firstArgument.transformToBytesValuesSV(valueBlock);
secondValues = _secondArgument.transformToBytesValuesSV(valueBlock);
for (int i = 0; i < valueBlock.getNumDocs(); i++) {
_doubleResults[i] = transformGeometryToDouble(GeometrySerializer.deserialize(firstValues[i]),
GeometrySerializer.deserialize(secondValues[i]));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.Utils;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.ColumnContext;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.core.operator.transform.function.BaseTransformFunction;
import org.apache.pinot.core.operator.transform.function.TransformFunction;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.data.FieldSpec;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.io.ParseException;
Expand All @@ -44,7 +44,7 @@ abstract class ConstructFromTextFunction extends BaseTransformFunction {
protected WKTReader _reader;

@Override
public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
public void init(List<TransformFunction> arguments, Map<String, ColumnContext> columnContextMap) {
Preconditions.checkArgument(arguments.size() == 1, "Exactly 1 argument is required for transform function: %s",
getName());
TransformFunction transformFunction = arguments.get(0);
Expand All @@ -64,12 +64,12 @@ public TransformResultMetadata getResultMetadata() {
}

@Override
public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) {
public byte[][] transformToBytesValuesSV(ValueBlock valueBlock) {
if (_results == null) {
_results = new byte[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
}
String[] argumentValues = _transformFunction.transformToStringValuesSV(projectionBlock);
int length = projectionBlock.getNumDocs();
String[] argumentValues = _transformFunction.transformToStringValuesSV(valueBlock);
int length = valueBlock.getNumDocs();
for (int i = 0; i < length; i++) {
try {
Geometry geometry = _reader.read(argumentValues[i]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.ColumnContext;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.core.operator.transform.function.BaseTransformFunction;
import org.apache.pinot.core.operator.transform.function.TransformFunction;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.utils.BytesUtils;
import org.locationtech.jts.geom.Geometry;
Expand All @@ -44,7 +44,7 @@ abstract class ConstructFromWKBFunction extends BaseTransformFunction {
private WKBReader _reader;

@Override
public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
public void init(List<TransformFunction> arguments, Map<String, ColumnContext> columnContextMap) {
Preconditions.checkArgument(arguments.size() == 1, "Exactly 1 argument is required for transform function: %s",
getName());
TransformFunction transformFunction = arguments.get(0);
Expand All @@ -64,12 +64,12 @@ public TransformResultMetadata getResultMetadata() {
}

@Override
public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) {
public byte[][] transformToBytesValuesSV(ValueBlock valueBlock) {
if (_results == null) {
_results = new byte[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
}
byte[][] argumentValues = _transformFunction.transformToBytesValuesSV(projectionBlock);
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
byte[][] argumentValues = _transformFunction.transformToBytesValuesSV(valueBlock);
for (int i = 0; i < valueBlock.getNumDocs(); i++) {
try {
Geometry geometry = _reader.read(argumentValues[i]);
_results[i] = GeometrySerializer.serialize(geometry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.ColumnContext;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.core.operator.transform.function.BaseTransformFunction;
import org.apache.pinot.core.operator.transform.function.LiteralTransformFunction;
import org.apache.pinot.core.operator.transform.function.TransformFunction;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.data.FieldSpec;
import org.locationtech.jts.geom.Geometry;

Expand All @@ -51,7 +51,7 @@ public String getName() {
}

@Override
public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
public void init(List<TransformFunction> arguments, Map<String, ColumnContext> columnContextMap) {
Preconditions.checkArgument(arguments.size() == 3 || arguments.size() == 2,
"Transform function %s requires 2 or 3 arguments", getName());
if (arguments.size() == 3) {
Expand Down Expand Up @@ -95,23 +95,23 @@ public TransformResultMetadata getResultMetadata() {
}

@Override
public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
public long[] transformToLongValuesSV(ValueBlock valueBlock) {
if (_results == null) {
_results = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL];
}

if (_thirdArgument == null) {
byte[][] geoValues = _firstArgument.transformToBytesValuesSV(projectionBlock);
int[] resValues = _secondArgument.transformToIntValuesSV(projectionBlock);
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
byte[][] geoValues = _firstArgument.transformToBytesValuesSV(valueBlock);
int[] resValues = _secondArgument.transformToIntValuesSV(valueBlock);
for (int i = 0; i < valueBlock.getNumDocs(); i++) {
Geometry geometry = GeometrySerializer.deserialize(geoValues[i]);
_results[i] = ScalarFunctions.geoToH3(geometry.getCoordinate().x, geometry.getCoordinate().y, resValues[i]);
}
} else {
double[] lonValues = _firstArgument.transformToDoubleValuesSV(projectionBlock);
double[] latValues = _secondArgument.transformToDoubleValuesSV(projectionBlock);
int[] resValues = _thirdArgument.transformToIntValuesSV(projectionBlock);
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
double[] lonValues = _firstArgument.transformToDoubleValuesSV(valueBlock);
double[] latValues = _secondArgument.transformToDoubleValuesSV(valueBlock);
int[] resValues = _thirdArgument.transformToIntValuesSV(valueBlock);
for (int i = 0; i < valueBlock.getNumDocs(); i++) {
_results[i] = ScalarFunctions.geoToH3(lonValues[i], latValues[i], resValues[i]);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.ColumnContext;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.core.operator.transform.function.BaseTransformFunction;
import org.apache.pinot.core.operator.transform.function.LiteralTransformFunction;
import org.apache.pinot.core.operator.transform.function.TransformFunction;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.segment.local.utils.GeometryUtils;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.data.FieldSpec;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.LineString;
Expand Down Expand Up @@ -57,7 +57,7 @@ public String getName() {
}

@Override
public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
public void init(List<TransformFunction> arguments, Map<String, ColumnContext> columnContextMap) {
Preconditions
.checkArgument(arguments.size() == 1, "Exactly 1 argument is required for transform function: %s", getName());
TransformFunction transformFunction = arguments.get(0);
Expand All @@ -76,13 +76,13 @@ public TransformResultMetadata getResultMetadata() {
}

@Override
public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
public double[] transformToDoubleValuesSV(ValueBlock valueBlock) {
if (_results == null) {
_results = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL];
}

byte[][] values = _transformFunction.transformToBytesValuesSV(projectionBlock);
int numDocs = projectionBlock.getNumDocs();
byte[][] values = _transformFunction.transformToBytesValuesSV(valueBlock);
int numDocs = valueBlock.getNumDocs();
for (int i = 0; i < numDocs; i++) {
Geometry geometry = GeometrySerializer.deserialize(values[i]);
_results[i] = GeometryUtils.isGeography(geometry) ? calculateGeographyArea(geometry) : geometry.getArea();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.ColumnContext;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.core.operator.transform.function.BaseTransformFunction;
import org.apache.pinot.core.operator.transform.function.TransformFunction;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.segment.local.utils.GeometryUtils;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.data.FieldSpec;
import org.locationtech.jts.geom.Geometry;

Expand All @@ -48,7 +48,7 @@ public String getName() {
}

@Override
public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
public void init(List<TransformFunction> arguments, Map<String, ColumnContext> columnContextMap) {
Preconditions.checkArgument(arguments.size() == 1, "Exactly 1 argument is required for transform function: %s",
getName());
TransformFunction transformFunction = arguments.get(0);
Expand All @@ -65,13 +65,13 @@ public TransformResultMetadata getResultMetadata() {
}

@Override
public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) {
public byte[][] transformToBytesValuesSV(ValueBlock valueBlock) {
if (_results == null) {
_results = new byte[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
}
byte[][] values = _transformFunction.transformToBytesValuesSV(projectionBlock);
byte[][] values = _transformFunction.transformToBytesValuesSV(valueBlock);
Geometry geometry;
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
for (int i = 0; i < valueBlock.getNumDocs(); i++) {
geometry = GeometrySerializer.deserialize(values[i]);
_results[i] = GeometryUtils.WKB_WRITER.write(geometry);
}
Expand Down
Loading

0 comments on commit 01ff18d

Please sign in to comment.