Skip to content

Commit

Permalink
Support scan filter for decimal in ORC
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Feb 11, 2025
1 parent 04bfdff commit 6e1ad03
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 13 deletions.
173 changes: 161 additions & 12 deletions velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,17 @@ void SelectiveDecimalColumnReader<DataT>::seekToRowGroup(int64_t index) {

template <typename DataT>
template <bool kDense>
void SelectiveDecimalColumnReader<DataT>::readHelper(RowSet rows) {
vector_size_t numRows = rows.back() + 1;
void SelectiveDecimalColumnReader<DataT>::readHelper(
common::Filter* filter,
RowSet rows) {
ExtractToReader extractValues(this);
common::AlwaysTrue filter;
common::AlwaysTrue alwaysTrue;
DirectRleColumnVisitor<
int64_t,
common::AlwaysTrue,
decltype(extractValues),
kDense>
visitor(filter, this, rows, extractValues);
visitor(alwaysTrue, this, rows, extractValues);

// decode scale stream
if (version_ == velox::dwrf::RleVersion_1) {
Expand All @@ -96,46 +97,194 @@ void SelectiveDecimalColumnReader<DataT>::readHelper(RowSet rows) {
// reset numValues_ before reading values
numValues_ = 0;
valueSize_ = sizeof(DataT);
vector_size_t numRows = rows.back() + 1;
ensureValuesCapacity<DataT>(numRows);

// decode value stream
facebook::velox::dwio::common::
ColumnVisitor<DataT, common::AlwaysTrue, decltype(extractValues), kDense>
valueVisitor(filter, this, rows, extractValues);
valueVisitor(alwaysTrue, this, rows, extractValues);
decodeWithVisitor<DirectDecoder<true>>(valueDecoder_.get(), valueVisitor);
readOffset_ += numRows;

// Fill decimals before applying filter.
fillDecimals();

const auto rawNulls = nullsInReadRange_
? (kDense ? nullsInReadRange_->as<uint64_t>() : rawResultNulls_)
: nullptr;
// Process filter.
process(filter, rows, rawNulls);
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::processNulls(
bool isNull,
const RowSet& rows,
const uint64_t* rawNulls) {
if (!rawNulls) {
return;
}
returnReaderNulls_ = false;
anyNulls_ = !isNull;
allNull_ = isNull;

auto rawDecimal = values_->asMutable<DataT>();
auto rawScale = scaleBuffer_->asMutable<int64_t>();

vector_size_t idx = 0;
if (isNull) {
for (vector_size_t i = 0; i < numValues_; i++) {
if (bits::isBitNull(rawNulls, i)) {
bits::setNull(rawResultNulls_, idx);
addOutputRow(rows[i]);
idx++;
}
}
} else {
for (vector_size_t i = 0; i < numValues_; i++) {
if (!bits::isBitNull(rawNulls, i)) {
bits::setNull(rawResultNulls_, idx, false);
rawDecimal[idx] = rawDecimal[i];
rawScale[idx] = rawScale[i];
addOutputRow(rows[i]);
idx++;
}
}
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::processFilter(
const common::Filter* filter,
const RowSet& rows,
const uint64_t* rawNulls) {
returnReaderNulls_ = false;
anyNulls_ = false;
allNull_ = true;

auto rawDecimal = values_->asMutable<DataT>();

vector_size_t idx = 0;
for (vector_size_t i = 0; i < numValues_; i++) {
if (rawNulls && bits::isBitNull(rawNulls, i)) {
if (filter->testNull()) {
bits::setNull(rawResultNulls_, idx);
addOutputRow(rows[i]);
anyNulls_ = true;
idx++;
}
} else {
bool tested;
if constexpr (std::is_same_v<DataT, int64_t>) {
tested = filter->testInt64(rawDecimal[i]);
} else {
tested = filter->testInt128(rawDecimal[i]);
}

if (tested) {
if (rawNulls) {
bits::setNull(rawResultNulls_, idx, false);
}
rawDecimal[idx] = rawDecimal[i];
addOutputRow(rows[i]);
allNull_ = false;
idx++;
}
}
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::process(
const common::Filter* filter,
const RowSet& rows,
const uint64_t* rawNulls) {
// Treat the filter as kAlwaysTrue if any of the following conditions are met:
// 1) No filter found;
// 2) Filter is kIsNotNull but rawNulls == NULL (no elements is null).
auto filterKind =
!filter || (filter->kind() == common::FilterKind::kIsNotNull && !rawNulls)
? common::FilterKind::kAlwaysTrue
: filter->kind();
switch (filterKind) {
case common::FilterKind::kAlwaysTrue:
// Simply add all rows to output.
for (vector_size_t i = 0; i < numValues_; i++) {
addOutputRow(rows[i]);
}
break;
case common::FilterKind::kIsNull:
processNulls(true, rows, rawNulls);
break;
case common::FilterKind::kIsNotNull:
processNulls(false, rows, rawNulls);
break;
case common::FilterKind::kBigintRange:
case common::FilterKind::kBigintValuesUsingHashTable:
case common::FilterKind::kBigintValuesUsingBitmask:
case common::FilterKind::kNegatedBigintRange:
case common::FilterKind::kNegatedBigintValuesUsingHashTable:
case common::FilterKind::kNegatedBigintValuesUsingBitmask:
case common::FilterKind::kBigintMultiRange: {
if constexpr (std::is_same_v<DataT, int64_t>) {
processFilter(filter, rows, rawNulls);
} else {
const auto actualType = CppToType<DataT>::create();
VELOX_NYI(
"Expected type BIGINT, but found file type {}.",
actualType->toString());
}
break;
}
case common::FilterKind::kHugeintValuesUsingHashTable:
case common::FilterKind::kHugeintRange: {
if constexpr (std::is_same_v<DataT, int128_t>) {
processFilter(filter, rows, rawNulls);
} else {
const auto actualType = CppToType<DataT>::create();
VELOX_NYI(
"Expected type HUGEINT, but found file type {}.",
actualType->toString());
}
break;
}
default:
VELOX_NYI("Unsupported filter: {}.", static_cast<int>(filterKind));
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::read(
int64_t offset,
const RowSet& rows,
const uint64_t* incomingNulls) {
VELOX_CHECK(!scanSpec_->filter());
VELOX_CHECK(!scanSpec_->valueHook());
prepareRead<int64_t>(offset, rows, incomingNulls);
bool isDense = rows.back() == rows.size() - 1;
if (isDense) {
readHelper<true>(rows);
readHelper<true>(scanSpec_->filter(), rows);
} else {
readHelper<false>(rows);
readHelper<false>(scanSpec_->filter(), rows);
}
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::getValues(
const RowSet& rows,
VectorPtr* result) {
rawValues_ = values_->asMutable<char>();
getIntValues(rows, requestedType_, result);
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::fillDecimals() {
auto nullsPtr =
resultNulls() ? resultNulls()->template as<uint64_t>() : nullptr;
auto scales = scaleBuffer_->as<int64_t>();
auto values = values_->asMutable<DataT>();

DecimalUtil::fillDecimals<DataT>(
values, nullsPtr, values, scales, numValues_, scale_);

rawValues_ = values_->asMutable<char>();
getIntValues(rows, requestedType_, result);
}

template class SelectiveDecimalColumnReader<int64_t>;
Expand Down
19 changes: 18 additions & 1 deletion velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,24 @@ class SelectiveDecimalColumnReader : public SelectiveColumnReader {

private:
template <bool kDense>
void readHelper(RowSet rows);
void readHelper(common::Filter* filter, RowSet rows);

// Process IsNull and IsNotNull filters.
void processNulls(bool isNull, const RowSet& rows, const uint64_t* rawNulls);

// Process filters on decimal values.
void processFilter(
const common::Filter* filter,
const RowSet& rows,
const uint64_t* rawNulls);

// Dispatch to the respective filter processing based on the filter type.
void process(
const common::Filter* filter,
const RowSet& rows,
const uint64_t* rawNulls);

void fillDecimals();

std::unique_ptr<IntDecoder<true>> valueDecoder_;
std::unique_ptr<IntDecoder<true>> scaleDecoder_;
Expand Down
1 change: 1 addition & 0 deletions velox/exec/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ target_link_libraries(
velox_aggregates
velox_dwio_common
velox_dwio_common_exception
velox_dwio_orc_reader
velox_dwio_common_test_utils
velox_dwio_parquet_reader
velox_dwio_parquet_writer
Expand Down
Loading

0 comments on commit 6e1ad03

Please sign in to comment.