Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proper null handling in SELECT, ORDER BY, DISTINCT, and GROUP BY #8927

Merged
merged 25 commits into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public interface DataTable {

Map<Integer, String> getExceptions();

int getVersion();

byte[] toBytes()
throws IOException;

Expand Down Expand Up @@ -77,6 +79,7 @@ byte[] toBytes()

String[] getStringArray(int rowId, int colId);

@Nullable
RoaringBitmap getNullRowIds(int colId);

DataTable toMetadataOnlyDataTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.roaringbitmap.RoaringBitmap;


/**
Expand All @@ -30,6 +31,12 @@
*/
public interface BlockValSet {

/**
* Returns the null value bitmap in the value set.
*/
@Nullable
RoaringBitmap getNullBitmap();

/**
* Returns the data type of the values in the value set.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.common.datatable.DataTableUtils;
import org.apache.pinot.core.query.request.context.ThreadTimer;
import org.apache.pinot.spi.utils.BigDecimalUtils;
Expand Down Expand Up @@ -200,6 +202,11 @@ public BaseDataBlock(ByteBuffer byteBuffer)
}
}

@Override
public int getVersion() {
return DataTableFactory.VERSION_4;
}

/**
* Return the int serialized form of the data block version and type.
* @return
Expand Down Expand Up @@ -239,6 +246,7 @@ public int getNumberOfRows() {
return _numRows;
}

@Nullable
@Override
public RoaringBitmap getNullRowIds(int colId) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.roaringbitmap.RoaringBitmap;
Expand Down Expand Up @@ -49,27 +50,25 @@ public RowDataBlock(ByteBuffer byteBuffer)
computeBlockObjectConstants();
}

@Nullable
@Override
public RoaringBitmap getNullRowIds(int colId) {
// _fixedSizeData stores two ints per col's null bitmap: offset, and length.
int position = _numRows * _rowSizeInBytes + colId * Integer.BYTES * 2;
if (position >= _fixedSizeData.limit()) {
if (_fixedSizeData == null || position >= _fixedSizeData.limit()) {
return null;
}

_fixedSizeData.position(position);
int offset = _fixedSizeData.getInt();
int bytesLength = _fixedSizeData.getInt();
RoaringBitmap nullBitmap;
if (bytesLength > 0) {
_variableSizeData.position(offset);
byte[] nullBitmapBytes = new byte[bytesLength];
_variableSizeData.get(nullBitmapBytes);
nullBitmap = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(nullBitmapBytes);
} else {
nullBitmap = new RoaringBitmap();
return ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(nullBitmapBytes);
}
return nullBitmap;
return null;
}

protected void computeBlockObjectConstants() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,6 @@ public BaseDataTable() {
_metadata = new HashMap<>();
}

/**
* get the current data table version.
*/
public abstract int getVersion();

/**
* Helper method to serialize dictionary map.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ private static DataTable buildEmptyDataTableForDistinctQuery(QueryContext queryC
ColumnDataType[] columnDataTypes = new ColumnDataType[columnNames.length];
// NOTE: Use STRING column data type as default for distinct query
Arrays.fill(columnDataTypes, ColumnDataType.STRING);
DistinctTable distinctTable =
new DistinctTable(new DataSchema(columnNames, columnDataTypes), Collections.emptySet());
DistinctTable distinctTable = new DistinctTable(
new DataSchema(columnNames, columnDataTypes), Collections.emptySet(), queryContext.isNullHandlingEnabled());

// Build the data table
DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,39 @@ public TableResizer(DataSchema dataSchema, QueryContext queryContext) {
_orderByValueExtractors[i] = getOrderByValueExtractor(orderByExpression.getExpression());
comparators[i] = orderByExpression.isAsc() ? Comparator.naturalOrder() : Comparator.reverseOrder();
}
_intermediateRecordComparator = (o1, o2) -> {
for (int i = 0; i < _numOrderByExpressions; i++) {
int result = comparators[i].compare(o1._values[i], o2._values[i]);
if (result != 0) {
return result;
boolean nullHandlingEnabled = queryContext.isNullHandlingEnabled();
if (nullHandlingEnabled) {
_intermediateRecordComparator = (o1, o2) -> {
for (int i = 0; i < _numOrderByExpressions; i++) {
Object v1 = o1._values[i];
Object v2 = o2._values[i];
if (v1 == null) {
if (v2 == null) {
continue;
}
// The default null ordering is NULLS LAST, regardless of the ordering direction.
return 1;
} else if (v2 == null) {
return -1;
}
int result = comparators[i].compare(v1, v2);
if (result != 0) {
return result;
}
}
}
return 0;
};
return 0;
};
} else {
_intermediateRecordComparator = (o1, o2) -> {
for (int i = 0; i < _numOrderByExpressions; i++) {
int result = comparators[i].compare(o1._values[i], o2._values[i]);
if (result != 0) {
return result;
}
}
return 0;
};
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.spi.data.FieldSpecUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.roaringbitmap.RoaringBitmap;


/**
Expand All @@ -54,6 +56,7 @@
@SuppressWarnings("rawtypes")
public class IntermediateResultsBlock implements Block {
private DataSchema _dataSchema;
private boolean _nullHandlingEnabled;
private Collection<Object[]> _selectionResult;
private AggregationFunction[] _aggregationFunctions;
private List<Object> _aggregationResult;
Expand All @@ -80,9 +83,11 @@ public IntermediateResultsBlock() {
/**
* Constructor for selection result.
*/
public IntermediateResultsBlock(DataSchema dataSchema, Collection<Object[]> selectionResult) {
public IntermediateResultsBlock(DataSchema dataSchema, Collection<Object[]> selectionResult,
boolean nullHandlingEnabled) {
_dataSchema = dataSchema;
_selectionResult = selectionResult;
_nullHandlingEnabled = nullHandlingEnabled;
}

/**
Expand All @@ -95,30 +100,45 @@ public IntermediateResultsBlock(AggregationFunction[] aggregationFunctions, List
_aggregationResult = aggregationResult;
}

/**
* Constructor for aggregation result.
* <p>For aggregation only, the result is a list of values.
* <p>For aggregation group-by, the result is a list of maps from group keys to aggregation values.
*/
public IntermediateResultsBlock(AggregationFunction[] aggregationFunctions, List<Object> aggregationResult,
DataSchema dataSchema) {
this(aggregationFunctions, aggregationResult);
_dataSchema = dataSchema;
}

/**
* Constructor for aggregation group-by order-by result with {@link AggregationGroupByResult}.
*/
public IntermediateResultsBlock(AggregationFunction[] aggregationFunctions,
@Nullable AggregationGroupByResult aggregationGroupByResults, DataSchema dataSchema) {
@Nullable AggregationGroupByResult aggregationGroupByResults, DataSchema dataSchema,
boolean nullHandlingEnabled) {
_aggregationFunctions = aggregationFunctions;
_aggregationGroupByResult = aggregationGroupByResults;
_dataSchema = dataSchema;
_nullHandlingEnabled = nullHandlingEnabled;
}

/**
* Constructor for aggregation group-by order-by result with {@link AggregationGroupByResult} and
* with a collection of intermediate records.
*/
public IntermediateResultsBlock(AggregationFunction[] aggregationFunctions,
Collection<IntermediateRecord> intermediateRecords, DataSchema dataSchema) {
Collection<IntermediateRecord> intermediateRecords, DataSchema dataSchema, boolean nullHandlingEnabled) {
_aggregationFunctions = aggregationFunctions;
_dataSchema = dataSchema;
_intermediateRecords = intermediateRecords;
_nullHandlingEnabled = nullHandlingEnabled;
}

public IntermediateResultsBlock(Table table) {
public IntermediateResultsBlock(Table table, boolean nullHandlingEnabled) {
_table = table;
_dataSchema = table.getDataSchema();
_nullHandlingEnabled = nullHandlingEnabled;
}

/**
Expand All @@ -127,6 +147,7 @@ public IntermediateResultsBlock(Table table) {
public IntermediateResultsBlock(ProcessingException processingException, Exception e) {
_processingExceptions = new ArrayList<>();
_processingExceptions.add(QueryException.getException(processingException, e));
_nullHandlingEnabled = false;
}

/**
Expand Down Expand Up @@ -311,16 +332,47 @@ private DataTable getResultDataTable()
throws IOException {
DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(_dataSchema);
ColumnDataType[] storedColumnDataTypes = _dataSchema.getStoredColumnDataTypes();
int numColumns = _dataSchema.size();
Iterator<Record> iterator = _table.iterator();
while (iterator.hasNext()) {
Record record = iterator.next();
dataTableBuilder.startRow();
int columnIndex = 0;
for (Object value : record.getValues()) {
setDataTableColumn(storedColumnDataTypes[columnIndex], dataTableBuilder, columnIndex, value);
columnIndex++;
if (_nullHandlingEnabled) {
RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
Object[] colDefaultNullValues = new Object[numColumns];
for (int colId = 0; colId < numColumns; colId++) {
if (storedColumnDataTypes[colId] != ColumnDataType.OBJECT) {
colDefaultNullValues[colId] =
FieldSpecUtils.getDefaultNullValue(storedColumnDataTypes[colId].toDataType());
}
nullBitmaps[colId] = new RoaringBitmap();
}
int rowId = 0;
while (iterator.hasNext()) {
Object[] values = iterator.next().getValues();
dataTableBuilder.startRow();
for (int columnIndex = 0; columnIndex < values.length; columnIndex++) {
Object value = values[columnIndex];
if (value == null) {
value = colDefaultNullValues[columnIndex];
nullBitmaps[columnIndex].add(rowId);
}
setDataTableColumn(storedColumnDataTypes[columnIndex], dataTableBuilder, columnIndex, value);
}
dataTableBuilder.finishRow();
rowId++;
}
for (int colId = 0; colId < numColumns; colId++) {
dataTableBuilder.setNullRowIds(nullBitmaps[colId]);
}
} else {
while (iterator.hasNext()) {
Record record = iterator.next();
dataTableBuilder.startRow();
int columnIndex = 0;
for (Object value : record.getValues()) {
setDataTableColumn(storedColumnDataTypes[columnIndex], dataTableBuilder, columnIndex, value);
columnIndex++;
}
dataTableBuilder.finishRow();
}
dataTableBuilder.finishRow();
}
DataTable dataTable = dataTableBuilder.build();
return attachMetadataToDataTable(dataTable);
Expand Down Expand Up @@ -376,7 +428,8 @@ private void setDataTableColumn(ColumnDataType columnDataType, DataTableBuilder

private DataTable getSelectionResultDataTable()
throws Exception {
return attachMetadataToDataTable(SelectionOperatorUtils.getDataTableFromRows(_selectionResult, _dataSchema));
return attachMetadataToDataTable(SelectionOperatorUtils.getDataTableFromRows(
_selectionResult, _dataSchema, _nullHandlingEnabled));
}

private DataTable getAggregationResultDataTable()
Expand All @@ -390,28 +443,55 @@ private DataTable getAggregationResultDataTable()
columnNames[i] = aggregationFunction.getColumnName();
columnDataTypes[i] = aggregationFunction.getIntermediateResultColumnType();
}
RoaringBitmap[] nullBitmaps = null;
Object[] colDefaultNullValues = null;
if (_nullHandlingEnabled) {
colDefaultNullValues = new Object[numAggregationFunctions];
nullBitmaps = new RoaringBitmap[numAggregationFunctions];
for (int i = 0; i < numAggregationFunctions; i++) {
if (columnDataTypes[i] != ColumnDataType.OBJECT) {
colDefaultNullValues[i] = FieldSpecUtils.getDefaultNullValue(columnDataTypes[i].toDataType());
}
nullBitmaps[i] = new RoaringBitmap();
}
}

// Build the data table.
DataTableBuilder dataTableBuilder =
DataTableFactory.getDataTableBuilder(new DataSchema(columnNames, columnDataTypes));
dataTableBuilder.startRow();
for (int i = 0; i < numAggregationFunctions; i++) {
Object value = _aggregationResult.get(i);
// OBJECT (e.g. DistinctTable) calls toBytes() (e.g. DistinctTable.toBytes()) which takes care of replacing nulls
// with default values, and building presence vector and serializing both.
if (_nullHandlingEnabled && columnDataTypes[i] != ColumnDataType.OBJECT) {
if (value == null) {
value = colDefaultNullValues[i];
nullBitmaps[i].add(0);
}
}

switch (columnDataTypes[i]) {
case LONG:
dataTableBuilder.setColumn(i, ((Number) _aggregationResult.get(i)).longValue());
dataTableBuilder.setColumn(i, ((Number) value).longValue());
break;
case DOUBLE:
dataTableBuilder.setColumn(i, ((Double) _aggregationResult.get(i)).doubleValue());
dataTableBuilder.setColumn(i, ((Double) value).doubleValue());
break;
case OBJECT:
dataTableBuilder.setColumn(i, _aggregationResult.get(i));
dataTableBuilder.setColumn(i, value);
break;
default:
throw new UnsupportedOperationException(
"Unsupported aggregation column data type: " + columnDataTypes[i] + " for column: " + columnNames[i]);
}
}
dataTableBuilder.finishRow();
if (_nullHandlingEnabled) {
for (int i = 0; i < numAggregationFunctions; i++) {
dataTableBuilder.setNullRowIds(nullBitmaps[i]);
}
}
DataTable dataTable = dataTableBuilder.build();

return attachMetadataToDataTable(dataTable);
Expand Down
Loading