Skip to content

Commit

Permalink
Use BinaryArray to wire proto for multi-stage engine bytes literal ha…
Browse files Browse the repository at this point in the history
…ndling
  • Loading branch information
xiangfu0 committed Oct 4, 2023
1 parent c96221f commit 8f04044
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.ByteString;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -183,12 +182,6 @@ public static Expression getLiteralExpression(byte[] value) {
return expression;
}

public static Expression getLiteralExpression(ByteString value) {
Expression expression = createNewLiteralExpression();
expression.getLiteral().setBinaryValue(value.toByteArray());
return expression;
}

public static Expression getLiteralExpression(BigDecimal value) {
Expression expression = createNewLiteralExpression();
expression.getLiteral().setBigDecimalValue(BigDecimalUtils.serialize(value));
Expand Down Expand Up @@ -221,9 +214,6 @@ public static Expression getLiteralExpression(Object object) {
if (object instanceof byte[]) {
return RequestUtils.getLiteralExpression((byte[]) object);
}
if (object instanceof ByteString) {
return RequestUtils.getLiteralExpression((ByteString) object);
}
if (object instanceof Boolean) {
return RequestUtils.getLiteralExpression(((Boolean) object).booleanValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,4 +447,29 @@ public void testStUnionQuery(boolean useMultiStageQueryEngine)
+ "05e89a7503b81b64042bddabe27179cc05e89a85caafbc24042be215336deb9c05e899ba1b196104042be385c67dfe3";
Assert.assertEquals(actualResult, expectedResult);
}

@Test(dataProvider = "useV2QueryEngine")
public void testStPointWithLiteralWithV2(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);

String query =
String.format("Select "
+ "ST_Point(1,2) "
+ "FROM %s a "
+ "JOIN %s b "
+ "ON a.wkt1=b.wkt1 "
+ "LIMIT 10",
getTableName(),
getTableName());
JsonNode pinotResponse = postQuery(query);
JsonNode rows = pinotResponse.get("resultTable").get("rows");
for (int i = 0; i < rows.size(); i++) {
JsonNode record = rows.get(i);
Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new Coordinate(1, 2));
byte[] expectedValue = GeometrySerializer.serialize(point);
byte[] actualValue = BytesUtils.toBytes(record.get(0).asText());
assertEquals(actualValue, expectedValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.plannode.SortNode;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.sql.FilterKind;
import org.apache.pinot.sql.parsers.SqlCompilationException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -186,12 +187,29 @@ public static Expression toExpression(RexExpression rexNode, PinotQuery pinotQue
case INPUT_REF:
return inputRefToIdentifier((RexExpression.InputRef) rexNode, pinotQuery);
case LITERAL:
return RequestUtils.getLiteralExpression(((RexExpression.Literal) rexNode).getValue());
return compileLiteralExpression(((RexExpression.Literal) rexNode).getValue());
default:
return compileFunctionExpression((RexExpression.FunctionCall) rexNode, pinotQuery);
}
}

/**
* Copy and modify from {@link RequestUtils#getLiteralExpression(Object)}.
*
*/
private static Expression compileLiteralExpression(Object object) {
if (object instanceof ByteArray) {
return getLiteralExpression((ByteArray) object);
}
return RequestUtils.getLiteralExpression(object);
}

private static Expression getLiteralExpression(ByteArray object) {
Expression expression = RequestUtils.createNewLiteralExpression();
expression.getLiteral().setBinaryValue(object.getBytes());
return expression;
}

private static Expression inputRefToIdentifier(RexExpression.InputRef inputRef, PinotQuery pinotQuery) {
List<Expression> selectList = pinotQuery.getSelectList();
return selectList.get(inputRef.getIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.calcite.util.Sarg;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.spi.utils.BooleanUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.checkerframework.checker.nullness.qual.Nullable;


Expand Down Expand Up @@ -90,7 +91,7 @@ private static Object convertValue(ColumnDataType dataType, @Nullable Comparable
case STRING:
return ((NlsString) value).getValue();
case BYTES:
return ((ByteString) value).getBytes();
return new ByteArray(((ByteString) value).getBytes());
default:
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import com.google.protobuf.ByteString;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.spi.utils.ByteArray;


/**
Expand Down Expand Up @@ -129,8 +129,8 @@ private static Plan.LiteralField stringField(String val) {
return Plan.LiteralField.newBuilder().setStringField(val).build();
}

private static Plan.LiteralField bytesField(ByteString val) {
return Plan.LiteralField.newBuilder().setBytesField(val).build();
private static Plan.LiteralField bytesField(ByteArray val) {
return Plan.LiteralField.newBuilder().setBytesField(ByteString.copyFrom(val.getBytes())).build();
}

private static Plan.MemberVariableField serializeMemberVariable(Object fieldObject) {
Expand All @@ -147,10 +147,8 @@ private static Plan.MemberVariableField serializeMemberVariable(Object fieldObje
builder.setLiteralField(doubleField((Double) fieldObject));
} else if (fieldObject instanceof String) {
builder.setLiteralField(stringField((String) fieldObject));
} else if (fieldObject instanceof byte[]) {
builder.setLiteralField(bytesField(ByteString.copyFrom((byte[]) fieldObject)));
} else if (fieldObject instanceof GregorianCalendar) {
builder.setLiteralField(longField(((GregorianCalendar) fieldObject).getTimeInMillis()));
} else if (fieldObject instanceof ByteArray) {
builder.setLiteralField(bytesField((ByteArray) fieldObject));
} else if (fieldObject instanceof List) {
builder.setListField(serializeListMemberVariable(fieldObject));
} else if (fieldObject instanceof Map) {
Expand Down Expand Up @@ -215,7 +213,7 @@ private static Object constructLiteral(Plan.LiteralField literalField) {
case STRINGFIELD:
return literalField.getStringField();
case BYTESFIELD:
return literalField.getBytesField();
return new ByteArray(literalField.getBytesField().toByteArray());
case LITERALFIELD_NOT_SET:
default:
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ public void setUp()
_mailboxService.start();

QueryServerEnclosure server1 = new QueryServerEnclosure(factory1);
QueryServerEnclosure server2 = new QueryServerEnclosure(factory2);
server1.start();
// Start server1 to ensure the next server will have a different port.
QueryServerEnclosure server2 = new QueryServerEnclosure(factory2);
server2.start();
// this doesn't test the QueryServer functionality so the server port can be the same as the mailbox port.
// this is only use for test identifier purpose.
Expand Down

0 comments on commit 8f04044

Please sign in to comment.