Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proper null handling in SELECT, ORDER BY, DISTINCT, and GROUP BY #8927

Merged
merged 25 commits into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public interface DataTable {

Map<Integer, String> getExceptions();

int getVersion();

byte[] toBytes()
throws IOException;

Expand Down Expand Up @@ -77,6 +79,7 @@ byte[] toBytes()

String[] getStringArray(int rowId, int colId);

@Nullable
RoaringBitmap getNullRowIds(int colId);

DataTable toMetadataOnlyDataTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.roaringbitmap.RoaringBitmap;


/**
Expand All @@ -30,6 +31,12 @@
*/
public interface BlockValSet {

/**
* Returns the null value bitmap in the value set.
*/
@Nullable
RoaringBitmap getNullBitmap();

/**
* Returns the data type of the values in the value set.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.common.datatable.DataTableUtils;
import org.apache.pinot.core.query.request.context.ThreadTimer;
import org.apache.pinot.spi.utils.BigDecimalUtils;
Expand Down Expand Up @@ -200,6 +202,11 @@ public BaseDataBlock(ByteBuffer byteBuffer)
}
}

@Override
public int getVersion() {
return DataTableFactory.VERSION_4;
}

/**
* Return the int serialized form of the data block version and type.
* @return
Expand Down Expand Up @@ -239,6 +246,7 @@ public int getNumberOfRows() {
return _numRows;
}

@Nullable
@Override
public RoaringBitmap getNullRowIds(int colId) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.roaringbitmap.RoaringBitmap;
Expand Down Expand Up @@ -49,27 +50,25 @@ public RowDataBlock(ByteBuffer byteBuffer)
computeBlockObjectConstants();
}

@Nullable
@Override
public RoaringBitmap getNullRowIds(int colId) {
// _fixedSizeData stores two ints per col's null bitmap: offset, and length.
int position = _numRows * _rowSizeInBytes + colId * Integer.BYTES * 2;
if (position >= _fixedSizeData.limit()) {
if (_fixedSizeData == null || position >= _fixedSizeData.limit()) {
return null;
}

_fixedSizeData.position(position);
int offset = _fixedSizeData.getInt();
int bytesLength = _fixedSizeData.getInt();
RoaringBitmap nullBitmap;
if (bytesLength > 0) {
_variableSizeData.position(offset);
byte[] nullBitmapBytes = new byte[bytesLength];
_variableSizeData.get(nullBitmapBytes);
nullBitmap = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(nullBitmapBytes);
} else {
nullBitmap = new RoaringBitmap();
return ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(nullBitmapBytes);
}
return nullBitmap;
return null;
}

protected void computeBlockObjectConstants() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,6 @@ public BaseDataTable() {
_metadata = new HashMap<>();
}

/**
* get the current data table version.
*/
public abstract int getVersion();

/**
* Helper method to serialize dictionary map.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ private static DataTable buildEmptyDataTableForDistinctQuery(QueryContext queryC
ColumnDataType[] columnDataTypes = new ColumnDataType[columnNames.length];
// NOTE: Use STRING column data type as default for distinct query
Arrays.fill(columnDataTypes, ColumnDataType.STRING);
DistinctTable distinctTable =
new DistinctTable(new DataSchema(columnNames, columnDataTypes), Collections.emptySet());
DistinctTable distinctTable = new DistinctTable(
new DataSchema(columnNames, columnDataTypes), Collections.emptySet(), queryContext.isNullHandlingEnabled());

// Build the data table
DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,39 @@ public TableResizer(DataSchema dataSchema, QueryContext queryContext) {
_orderByValueExtractors[i] = getOrderByValueExtractor(orderByExpression.getExpression());
comparators[i] = orderByExpression.isAsc() ? Comparator.naturalOrder() : Comparator.reverseOrder();
}
_intermediateRecordComparator = (o1, o2) -> {
for (int i = 0; i < _numOrderByExpressions; i++) {
int result = comparators[i].compare(o1._values[i], o2._values[i]);
if (result != 0) {
return result;
boolean nullHandlingEnabled = queryContext.isNullHandlingEnabled();
if (nullHandlingEnabled) {
_intermediateRecordComparator = (o1, o2) -> {
for (int i = 0; i < _numOrderByExpressions; i++) {
Object v1 = o1._values[i];
Object v2 = o2._values[i];
if (v1 == null) {
if (v2 == null) {
continue;
}
// The default null ordering is NULLS LAST, regardless of the ordering direction.
return 1;
} else if (v2 == null) {
return -1;
}
int result = comparators[i].compare(v1, v2);
if (result != 0) {
return result;
}
}
}
return 0;
};
return 0;
};
} else {
_intermediateRecordComparator = (o1, o2) -> {
for (int i = 0; i < _numOrderByExpressions; i++) {
int result = comparators[i].compare(o1._values[i], o2._values[i]);
if (result != 0) {
return result;
}
}
return 0;
};
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.NullValueUtils;
import org.roaringbitmap.RoaringBitmap;


/**
Expand All @@ -54,6 +56,7 @@
@SuppressWarnings("rawtypes")
public class IntermediateResultsBlock implements Block {
private DataSchema _dataSchema;
private boolean _nullHandlingEnabled;
private Collection<Object[]> _selectionResult;
private AggregationFunction[] _aggregationFunctions;
private List<Object> _aggregationResult;
Expand All @@ -80,9 +83,11 @@ public IntermediateResultsBlock() {
/**
* Constructor for selection result.
*/
public IntermediateResultsBlock(DataSchema dataSchema, Collection<Object[]> selectionResult) {
public IntermediateResultsBlock(DataSchema dataSchema, Collection<Object[]> selectionResult,
boolean nullHandlingEnabled) {
_dataSchema = dataSchema;
_selectionResult = selectionResult;
_nullHandlingEnabled = nullHandlingEnabled;
}

/**
Expand Down Expand Up @@ -116,9 +121,10 @@ public IntermediateResultsBlock(AggregationFunction[] aggregationFunctions,
_intermediateRecords = intermediateRecords;
}

public IntermediateResultsBlock(Table table) {
public IntermediateResultsBlock(Table table, boolean nullHandlingEnabled) {
_table = table;
_dataSchema = table.getDataSchema();
_nullHandlingEnabled = nullHandlingEnabled;
}

/**
Expand All @@ -127,6 +133,7 @@ public IntermediateResultsBlock(Table table) {
public IntermediateResultsBlock(ProcessingException processingException, Exception e) {
_processingExceptions = new ArrayList<>();
_processingExceptions.add(QueryException.getException(processingException, e));
_nullHandlingEnabled = false;
}

/**
Expand Down Expand Up @@ -311,16 +318,47 @@ private DataTable getResultDataTable()
throws IOException {
DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(_dataSchema);
ColumnDataType[] storedColumnDataTypes = _dataSchema.getStoredColumnDataTypes();
int numColumns = _dataSchema.size();
Iterator<Record> iterator = _table.iterator();
while (iterator.hasNext()) {
Record record = iterator.next();
dataTableBuilder.startRow();
int columnIndex = 0;
for (Object value : record.getValues()) {
setDataTableColumn(storedColumnDataTypes[columnIndex], dataTableBuilder, columnIndex, value);
columnIndex++;
if (_nullHandlingEnabled) {
RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
Object[] colDefaultNullValues = new Object[numColumns];
for (int colId = 0; colId < numColumns; colId++) {
if (storedColumnDataTypes[colId] != ColumnDataType.OBJECT) {
colDefaultNullValues[colId] =
NullValueUtils.getDefaultNullValue(storedColumnDataTypes[colId].toDataType());
}
nullBitmaps[colId] = new RoaringBitmap();
}
int rowId = 0;
while (iterator.hasNext()) {
Object[] values = iterator.next().getValues();
dataTableBuilder.startRow();
for (int columnIndex = 0; columnIndex < values.length; columnIndex++) {
Object value = values[columnIndex];
if (value == null) {
value = colDefaultNullValues[columnIndex];
nullBitmaps[columnIndex].add(rowId);
}
setDataTableColumn(storedColumnDataTypes[columnIndex], dataTableBuilder, columnIndex, value);
}
dataTableBuilder.finishRow();
rowId++;
}
for (int colId = 0; colId < numColumns; colId++) {
dataTableBuilder.setNullRowIds(nullBitmaps[colId]);
}
} else {
while (iterator.hasNext()) {
Record record = iterator.next();
dataTableBuilder.startRow();
int columnIndex = 0;
for (Object value : record.getValues()) {
setDataTableColumn(storedColumnDataTypes[columnIndex], dataTableBuilder, columnIndex, value);
columnIndex++;
}
dataTableBuilder.finishRow();
}
dataTableBuilder.finishRow();
}
DataTable dataTable = dataTableBuilder.build();
return attachMetadataToDataTable(dataTable);
Expand Down Expand Up @@ -376,7 +414,8 @@ private void setDataTableColumn(ColumnDataType columnDataType, DataTableBuilder

private DataTable getSelectionResultDataTable()
throws Exception {
return attachMetadataToDataTable(SelectionOperatorUtils.getDataTableFromRows(_selectionResult, _dataSchema));
return attachMetadataToDataTable(SelectionOperatorUtils.getDataTableFromRows(
_selectionResult, _dataSchema, _nullHandlingEnabled));
}

private DataTable getAggregationResultDataTable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public BlockValSet getBlockValueSet(ExpressionContext expression) {
if (expression.getType() == ExpressionContext.Type.IDENTIFIER) {
return _projectionBlock.getBlockValueSet(expression.getIdentifier());
} else {
return new TransformBlockValSet(_projectionBlock, _transformFunctionMap.get(expression));
return new TransformBlockValSet(_projectionBlock, _transformFunctionMap.get(expression), expression);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,8 @@ protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, Intermed

// Convert the merged table into a main table if necessary in order to merge other tables
if (!mergedDistinctTable.isMainTable()) {
DistinctTable mainDistinctTable =
new DistinctTable(distinctTableToMerge.getDataSchema(), _queryContext.getOrderByExpressions(),
_queryContext.getLimit());
DistinctTable mainDistinctTable = new DistinctTable(distinctTableToMerge.getDataSchema(),
_queryContext.getOrderByExpressions(), _queryContext.getLimit(), _queryContext.isNullHandlingEnabled());
mainDistinctTable.mergeTable(mergedDistinctTable);
mergedBlock.setAggregationResults(Collections.singletonList(mainDistinctTable));
mergedDistinctTable = mainDistinctTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ protected IntermediateResultsBlock mergeResults()

IndexedTable indexedTable = _indexedTable;
indexedTable.finish(false);
IntermediateResultsBlock mergedBlock = new IntermediateResultsBlock(indexedTable);
IntermediateResultsBlock mergedBlock = new IntermediateResultsBlock(
indexedTable, _queryContext.isNullHandlingEnabled());
mergedBlock.setNumGroupsLimitReached(_numGroupsLimitReached);
mergedBlock.setNumResizes(indexedTable.getNumResizes());
mergedBlock.setResizeTimeMs(indexedTable.getResizeTimeMs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
// special IntermediateResultsBlock into the BlockingQueue to awake the main thread
private static final IntermediateResultsBlock LAST_RESULTS_BLOCK =
new IntermediateResultsBlock(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]),
Collections.emptyList());
Collections.emptyList(), false);

// Use an AtomicInteger to track the number of operators skipped (no result inserted into the BlockingQueue)
private final AtomicInteger _numOperatorsSkipped = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import org.apache.pinot.core.operator.ProjectionOperator;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.trace.InvocationRecording;
import org.apache.pinot.spi.trace.InvocationScope;
import org.apache.pinot.spi.trace.Tracing;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;


/**
Expand All @@ -41,6 +44,9 @@ public class ProjectionBlockValSet implements BlockValSet {
private final String _column;
private final DataSource _dataSource;

private boolean _nullBitmapSet;
private RoaringBitmap _nullBitmap;

/**
* Constructor for the class.
* The dataBlockCache is initialized in {@link ProjectionOperator} so that it can be reused across multiple calls to
Expand All @@ -52,6 +58,30 @@ public ProjectionBlockValSet(DataBlockCache dataBlockCache, String column, DataS
_dataSource = dataSource;
}

@Nullable
@Override
public RoaringBitmap getNullBitmap() {
if (!_nullBitmapSet) {
NullValueVectorReader nullValueReader = _dataSource.getNullValueVector();
ImmutableRoaringBitmap nullBitmap = nullValueReader != null ? nullValueReader.getNullBitmap() : null;
if (nullBitmap != null && !nullBitmap.isEmpty()) {
// Project null bitmap.
RoaringBitmap projectedNullBitmap = new RoaringBitmap();
int[] docIds = _dataBlockCache.getDocIds();
for (int i = 0; i < _dataBlockCache.getNumDocs(); i++) {
if (nullBitmap.contains(docIds[i])) {
projectedNullBitmap.add(i);
}
}
_nullBitmap = projectedNullBitmap;
} else {
_nullBitmap = null;
}
_nullBitmapSet = true;
}
return _nullBitmap;
}

@Override
public DataType getValueType() {
return _dataSource.getDataSourceMetadata().getDataType();
Expand Down
Loading