Skip to content

Commit

Permalink
Fix upsert inconsistency by snapshotting the validDocIds before readi…
Browse files Browse the repository at this point in the history
…ng the numDocs (#8392)
  • Loading branch information
Jackie-Jiang authored Mar 23, 2022
1 parent 6d2f749 commit 5e7c005
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.FunctionContext;
Expand All @@ -45,57 +46,55 @@
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.util.QueryOptionsUtils;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.roaringbitmap.buffer.MutableRoaringBitmap;


public class FilterPlanNode implements PlanNode {
private final IndexSegment _indexSegment;
private final QueryContext _queryContext;
private final int _numDocs;
private FilterContext _filterContext;
private final FilterContext _filter;

// Cache the predicate evaluators
private final Map<Predicate, PredicateEvaluator> _predicateEvaluatorMap = new HashMap<>();

public FilterPlanNode(IndexSegment indexSegment, QueryContext queryContext) {
_indexSegment = indexSegment;
_queryContext = queryContext;
// NOTE: Fetch number of documents in the segment when creating the plan node so that it is consistent among all
// filter operators. Number of documents will keep increasing for MutableSegment (CONSUMING segment).
_numDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
this(indexSegment, queryContext, null);
}

public FilterPlanNode(IndexSegment indexSegment, QueryContext queryContext, FilterContext filterContext) {
this(indexSegment, queryContext);

_filterContext = filterContext;
public FilterPlanNode(IndexSegment indexSegment, QueryContext queryContext, @Nullable FilterContext filter) {
_indexSegment = indexSegment;
_queryContext = queryContext;
_filter = filter;
}

@Override
public BaseFilterOperator run() {
FilterContext filter = _filterContext == null ? _queryContext.getFilter() : _filterContext;
// NOTE: Snapshot the validDocIds before reading the numDocs to prevent the latest updates getting lost
ThreadSafeMutableRoaringBitmap validDocIds = _indexSegment.getValidDocIds();
boolean applyValidDocIds = validDocIds != null && !QueryOptionsUtils.isSkipUpsert(_queryContext.getQueryOptions());
MutableRoaringBitmap validDocIdsSnapshot =
validDocIds != null && !_queryContext.isSkipUpsert() ? validDocIds.getMutableRoaringBitmap() : null;
int numDocs = _indexSegment.getSegmentMetadata().getTotalDocs();

FilterContext filter = _filter != null ? _filter : _queryContext.getFilter();
if (filter != null) {
BaseFilterOperator filterOperator = constructPhysicalOperator(filter);
if (applyValidDocIds) {
BaseFilterOperator validDocFilter =
new BitmapBasedFilterOperator(validDocIds.getMutableRoaringBitmap(), false, _numDocs);
return FilterOperatorUtils.getAndFilterOperator(Arrays.asList(filterOperator, validDocFilter), _numDocs,
BaseFilterOperator filterOperator = constructPhysicalOperator(filter, numDocs);
if (validDocIdsSnapshot != null) {
BaseFilterOperator validDocFilter = new BitmapBasedFilterOperator(validDocIdsSnapshot, false, numDocs);
return FilterOperatorUtils.getAndFilterOperator(Arrays.asList(filterOperator, validDocFilter), numDocs,
_queryContext.getDebugOptions());
} else {
return filterOperator;
}
} else if (applyValidDocIds) {
return new BitmapBasedFilterOperator(validDocIds.getMutableRoaringBitmap(), false, _numDocs);
} else if (validDocIdsSnapshot != null) {
return new BitmapBasedFilterOperator(validDocIdsSnapshot, false, numDocs);
} else {
return new MatchAllFilterOperator(_numDocs);
return new MatchAllFilterOperator(numDocs);
}
}

Expand Down Expand Up @@ -143,13 +142,13 @@ private boolean canApplyH3Index(Predicate predicate, FunctionContext function) {
/**
* Helper method to build the operator tree from the filter.
*/
private BaseFilterOperator constructPhysicalOperator(FilterContext filter) {
private BaseFilterOperator constructPhysicalOperator(FilterContext filter, int numDocs) {
switch (filter.getType()) {
case AND:
List<FilterContext> childFilters = filter.getChildren();
List<BaseFilterOperator> childFilterOperators = new ArrayList<>(childFilters.size());
for (FilterContext childFilter : childFilters) {
BaseFilterOperator childFilterOperator = constructPhysicalOperator(childFilter);
BaseFilterOperator childFilterOperator = constructPhysicalOperator(childFilter, numDocs);
if (childFilterOperator.isResultEmpty()) {
// Return empty filter operator if any of the child filter operator's result is empty
return EmptyFilterOperator.getInstance();
Expand All @@ -158,47 +157,46 @@ private BaseFilterOperator constructPhysicalOperator(FilterContext filter) {
childFilterOperators.add(childFilterOperator);
}
}
return FilterOperatorUtils.getAndFilterOperator(childFilterOperators, _numDocs,
_queryContext.getDebugOptions());
return FilterOperatorUtils.getAndFilterOperator(childFilterOperators, numDocs, _queryContext.getDebugOptions());
case OR:
childFilters = filter.getChildren();
childFilterOperators = new ArrayList<>(childFilters.size());
for (FilterContext childFilter : childFilters) {
BaseFilterOperator childFilterOperator = constructPhysicalOperator(childFilter);
BaseFilterOperator childFilterOperator = constructPhysicalOperator(childFilter, numDocs);
if (childFilterOperator.isResultMatchingAll()) {
// Return match all filter operator if any of the child filter operator matches all records
return new MatchAllFilterOperator(_numDocs);
return new MatchAllFilterOperator(numDocs);
} else if (!childFilterOperator.isResultEmpty()) {
// Remove child filter operators whose result is empty
childFilterOperators.add(childFilterOperator);
}
}
return FilterOperatorUtils.getOrFilterOperator(childFilterOperators, _numDocs, _queryContext.getDebugOptions());
return FilterOperatorUtils.getOrFilterOperator(childFilterOperators, numDocs, _queryContext.getDebugOptions());
case NOT:
childFilters = filter.getChildren();
assert childFilters.size() == 1;
BaseFilterOperator childFilterOperator = constructPhysicalOperator(childFilters.get(0));
return FilterOperatorUtils.getNotFilterOperator(childFilterOperator, _numDocs, null);
BaseFilterOperator childFilterOperator = constructPhysicalOperator(childFilters.get(0), numDocs);
return FilterOperatorUtils.getNotFilterOperator(childFilterOperator, numDocs, null);
case PREDICATE:
Predicate predicate = filter.getPredicate();
ExpressionContext lhs = predicate.getLhs();
if (lhs.getType() == ExpressionContext.Type.FUNCTION) {
if (canApplyH3Index(predicate, lhs.getFunction())) {
return new H3IndexFilterOperator(_indexSegment, predicate, _numDocs);
return new H3IndexFilterOperator(_indexSegment, predicate, numDocs);
}
// TODO: ExpressionFilterOperator does not support predicate types without PredicateEvaluator (IS_NULL,
// IS_NOT_NULL, TEXT_MATCH)
return new ExpressionFilterOperator(_indexSegment, predicate, _numDocs);
return new ExpressionFilterOperator(_indexSegment, predicate, numDocs);
} else {
String column = lhs.getIdentifier();
DataSource dataSource = _indexSegment.getDataSource(column);
PredicateEvaluator predicateEvaluator = _predicateEvaluatorMap.get(predicate);
if (predicateEvaluator != null) {
return FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, _numDocs);
return FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, numDocs);
}
switch (predicate.getType()) {
case TEXT_MATCH:
return new TextMatchFilterOperator(dataSource.getTextIndex(), (TextMatchPredicate) predicate, _numDocs);
return new TextMatchFilterOperator(dataSource.getTextIndex(), (TextMatchPredicate) predicate, numDocs);
case REGEXP_LIKE:
// FST Index is available only for rolled out segments. So, we use different evaluator for rolled out and
// consuming segments.
Expand All @@ -218,32 +216,32 @@ private BaseFilterOperator constructPhysicalOperator(FilterContext filter) {
dataSource.getDataSourceMetadata().getDataType());
}
_predicateEvaluatorMap.put(predicate, predicateEvaluator);
return FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, _numDocs);
return FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, numDocs);
case JSON_MATCH:
JsonIndexReader jsonIndex = dataSource.getJsonIndex();
Preconditions.checkState(jsonIndex != null, "Cannot apply JSON_MATCH on column: %s without json index",
column);
return new JsonMatchFilterOperator(jsonIndex, (JsonMatchPredicate) predicate, _numDocs);
return new JsonMatchFilterOperator(jsonIndex, (JsonMatchPredicate) predicate, numDocs);
case IS_NULL:
NullValueVectorReader nullValueVector = dataSource.getNullValueVector();
if (nullValueVector != null) {
return new BitmapBasedFilterOperator(nullValueVector.getNullBitmap(), false, _numDocs);
return new BitmapBasedFilterOperator(nullValueVector.getNullBitmap(), false, numDocs);
} else {
return EmptyFilterOperator.getInstance();
}
case IS_NOT_NULL:
nullValueVector = dataSource.getNullValueVector();
if (nullValueVector != null) {
return new BitmapBasedFilterOperator(nullValueVector.getNullBitmap(), true, _numDocs);
return new BitmapBasedFilterOperator(nullValueVector.getNullBitmap(), true, numDocs);
} else {
return new MatchAllFilterOperator(_numDocs);
return new MatchAllFilterOperator(numDocs);
}
default:
predicateEvaluator =
PredicateEvaluatorProvider.getPredicateEvaluator(predicate, dataSource.getDictionary(),
dataSource.getDataSourceMetadata().getDataType());
_predicateEvaluatorMap.put(predicate, predicateEvaluator);
return FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, _numDocs);
return FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, numDocs);
}
}
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ public Plan makeInstancePlan(List<IndexSegment> indexSegments, QueryContext quer
private void applyQueryOptions(QueryContext queryContext) {
Map<String, String> queryOptions = queryContext.getQueryOptions();

// Set skipUpsert
queryContext.setSkipUpsert(QueryOptionsUtils.isSkipUpsert(queryOptions));

// Set maxExecutionThreads
int maxExecutionThreads;
Integer maxExecutionThreadsFromQuery = QueryOptionsUtils.getMaxExecutionThreads(queryOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public class QueryContext {
private long _endTimeMs;
// Whether to enable prefetch for the query
private boolean _enablePrefetch;
// Whether to skip upsert for the query
private boolean _skipUpsert;
// Maximum number of threads used to execute the query
private int _maxExecutionThreads = InstancePlanMakerImplV2.DEFAULT_MAX_EXECUTION_THREADS;
// The following properties apply to group-by queries
Expand Down Expand Up @@ -294,6 +296,14 @@ public void setEnablePrefetch(boolean enablePrefetch) {
_enablePrefetch = enablePrefetch;
}

public boolean isSkipUpsert() {
return _skipUpsert;
}

public void setSkipUpsert(boolean skipUpsert) {
_skipUpsert = skipUpsert;
}

public int getMaxExecutionThreads() {
return _maxExecutionThreads;
}
Expand Down

0 comments on commit 5e7c005

Please sign in to comment.