Skip to content

Commit

Permalink
Refactor single column distinct executors (#10859)
Browse files Browse the repository at this point in the history
  • Loading branch information
shenyu0127 authored Jun 9, 2023
1 parent a75544f commit 880a074
Show file tree
Hide file tree
Showing 21 changed files with 303 additions and 471 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.query.distinct.DistinctExecutor;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.roaringbitmap.RoaringBitmap;


/**
Expand Down Expand Up @@ -63,4 +66,31 @@ public DistinctTable getResult() {
}
return new DistinctTable(dataSchema, records, _nullHandlingEnabled);
}

@Override
public boolean process(ValueBlock valueBlock) {
BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
BigDecimal[] values = blockValueSet.getBigDecimalValuesSV();
int numDocs = valueBlock.getNumDocs();
if (_nullHandlingEnabled) {
RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
for (int i = 0; i < numDocs; i++) {
if (nullBitmap != null && nullBitmap.contains(i)) {
values[i] = null;
}
if (add(values[i])) {
return true;
}
}
} else {
for (int i = 0; i < numDocs; i++) {
if (add(values[i])) {
return true;
}
}
}
return false;
}

protected abstract boolean add(BigDecimal value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.query.distinct.DistinctExecutor;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.ByteArray;
import org.roaringbitmap.RoaringBitmap;


/**
Expand Down Expand Up @@ -63,4 +66,31 @@ public DistinctTable getResult() {
}
return new DistinctTable(dataSchema, records, _nullHandlingEnabled);
}

@Override
public boolean process(ValueBlock valueBlock) {
BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
byte[][] values = blockValueSet.getBytesValuesSV();
int numDocs = valueBlock.getNumDocs();
if (_nullHandlingEnabled) {
RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
for (int i = 0; i < numDocs; i++) {
if (nullBitmap != null && nullBitmap.contains(i)) {
values[i] = null;
}
if (add(values[i])) {
return true;
}
}
} else {
for (int i = 0; i < numDocs; i++) {
if (add(values[i])) {
return true;
}
}
}
return false;
}

protected abstract boolean add(byte[] value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.query.distinct.DistinctExecutor;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.roaringbitmap.RoaringBitmap;


/**
Expand Down Expand Up @@ -68,4 +71,41 @@ public DistinctTable getResult() {
}
return new DistinctTable(dataSchema, records, _nullHandlingEnabled);
}

@Override
public boolean process(ValueBlock valueBlock) {
BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
int numDocs = valueBlock.getNumDocs();
if (blockValueSet.isSingleValue()) {
double[] values = blockValueSet.getDoubleValuesSV();
if (_nullHandlingEnabled) {
RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
for (int i = 0; i < numDocs; i++) {
if (nullBitmap != null && nullBitmap.contains(i)) {
_hasNull = true;
} else if (add(values[i])) {
return true;
}
}
} else {
for (int i = 0; i < numDocs; i++) {
if (add(values[i])) {
return true;
}
}
}
} else {
int[][] values = blockValueSet.getIntValuesMV();
for (int i = 0; i < numDocs; i++) {
for (double value : values[i]) {
if (add(value)) {
return true;
}
}
}
}
return false;
}

protected abstract boolean add(double val);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.query.distinct.DistinctExecutor;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.roaringbitmap.RoaringBitmap;


/**
Expand Down Expand Up @@ -69,4 +72,41 @@ public DistinctTable getResult() {
assert records.size() <= _limit;
return new DistinctTable(dataSchema, records, _nullHandlingEnabled);
}

@Override
public boolean process(ValueBlock valueBlock) {
BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
int numDocs = valueBlock.getNumDocs();
if (blockValueSet.isSingleValue()) {
float[] values = blockValueSet.getFloatValuesSV();
if (_nullHandlingEnabled) {
RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
for (int i = 0; i < numDocs; i++) {
if (nullBitmap != null && nullBitmap.contains(i)) {
_hasNull = true;
} else if (add(values[i])) {
return true;
}
}
} else {
for (int i = 0; i < numDocs; i++) {
if (add(values[i])) {
return true;
}
}
}
} else {
float[][] values = blockValueSet.getFloatValuesMV();
for (int i = 0; i < numDocs; i++) {
for (float value : values[i]) {
if (add(value)) {
return true;
}
}
}
}
return false;
}

protected abstract boolean add(float val);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.query.distinct.DistinctExecutor;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.roaringbitmap.RoaringBitmap;


/**
Expand Down Expand Up @@ -70,4 +73,41 @@ public DistinctTable getResult() {
assert records.size() <= _limit;
return new DistinctTable(dataSchema, records, _nullHandlingEnabled);
}

@Override
public boolean process(ValueBlock valueBlock) {
BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
int numDocs = valueBlock.getNumDocs();
if (blockValueSet.isSingleValue()) {
int[] values = blockValueSet.getIntValuesSV();
if (_nullHandlingEnabled) {
RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
for (int i = 0; i < numDocs; i++) {
if (nullBitmap != null && nullBitmap.contains(i)) {
_hasNull = true;
} else if (add(values[i])) {
return true;
}
}
} else {
for (int i = 0; i < numDocs; i++) {
if (add(values[i])) {
return true;
}
}
}
} else {
int[][] values = blockValueSet.getIntValuesMV();
for (int i = 0; i < numDocs; i++) {
for (int value : values[i]) {
if (add(value)) {
return true;
}
}
}
}
return false;
}

protected abstract boolean add(int val);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.query.distinct.DistinctExecutor;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.roaringbitmap.RoaringBitmap;


/**
Expand Down Expand Up @@ -69,4 +72,41 @@ public DistinctTable getResult() {
assert records.size() <= _limit;
return new DistinctTable(dataSchema, records, _nullHandlingEnabled);
}

@Override
public boolean process(ValueBlock valueBlock) {
BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
int numDocs = valueBlock.getNumDocs();
if (blockValueSet.isSingleValue()) {
long[] values = blockValueSet.getLongValuesSV();
if (_nullHandlingEnabled) {
RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
for (int i = 0; i < numDocs; i++) {
if (nullBitmap != null && nullBitmap.contains(i)) {
_hasNull = true;
} else if (add(values[i])) {
return true;
}
}
} else {
for (int i = 0; i < numDocs; i++) {
if (add(values[i])) {
return true;
}
}
}
} else {
long[][] values = blockValueSet.getLongValuesMV();
for (int i = 0; i < numDocs; i++) {
for (long value : values[i]) {
if (add(value)) {
return true;
}
}
}
}
return false;
}

protected abstract boolean add(long val);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.query.distinct.DistinctExecutor;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.roaringbitmap.RoaringBitmap;


/**
Expand Down Expand Up @@ -62,4 +65,42 @@ public DistinctTable getResult() {
}
return new DistinctTable(dataSchema, records, _nullHandlingEnabled);
}

@Override
public boolean process(ValueBlock valueBlock) {
BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
int numDocs = valueBlock.getNumDocs();
if (blockValueSet.isSingleValue()) {
String[] values = blockValueSet.getStringValuesSV();
if (_nullHandlingEnabled) {
RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
for (int i = 0; i < numDocs; i++) {
if (nullBitmap != null && nullBitmap.contains(i)) {
values[i] = null;
}
if (add(values[i])) {
return true;
}
}
} else {
for (int i = 0; i < numDocs; i++) {
if (add(values[i])) {
return true;
}
}
}
} else {
String[][] values = blockValueSet.getStringValuesMV();
for (int i = 0; i < numDocs; i++) {
for (String value : values[i]) {
if (add(value)) {
return true;
}
}
}
}
return false;
}

protected abstract boolean add(String val);
}
Loading

0 comments on commit 880a074

Please sign in to comment.