Skip to content

Commit

Permalink
MB-63736: "EligibleDocumentMatchHandler" is for local purposes only (#…
Browse files Browse the repository at this point in the history
…2084)

+ This _MUST NOT_ be replaced with the document handler from the calling
application's context which could contain the streaming document match
handler used by result streamers (like n1fty). The bug in the current
code will cause bleve to prematurely stream interim eligible docs to the
listener which is unintended at best.

+ Also a side optimization along allocation in the same code path.
  • Loading branch information
abhinavdangeti authored Oct 8, 2024
1 parent 5bb215c commit fab6e1e
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 53 deletions.
67 changes: 39 additions & 28 deletions search/collector/eligible.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package collector

import (
"context"
"fmt"
"time"

"github.com/blevesearch/bleve/v2/search"
Expand All @@ -37,38 +38,48 @@ func NewEligibleCollector(size int) *EligibleCollector {

func newEligibleCollector(size int) *EligibleCollector {
// No sort order & skip always 0 since this is only to filter eligible docs.
hc := &EligibleCollector{size: size}
ec := &EligibleCollector{size: size}

// comparator is a dummy here
hc.store = getOptimalCollectorStore(size, 0, func(i, j *search.DocumentMatch) int {
ec.store = getOptimalCollectorStore(size, 0, func(i, j *search.DocumentMatch) int {
return 0
})

return hc
return ec
}

func (hc *EligibleCollector) Collect(ctx context.Context, searcher search.Searcher, reader index.IndexReader) error {
func makeEligibleDocumentMatchHandler(ctx *search.SearchContext) (search.DocumentMatchHandler, error) {
if ec, ok := ctx.Collector.(*EligibleCollector); ok {
return func(d *search.DocumentMatch) error {
if d == nil {
return nil
}

// No elements removed from the store here.
_ = ec.store.Add(d)
return nil
}, nil
}

return nil, fmt.Errorf("eligiblity collector not available")
}

func (ec *EligibleCollector) Collect(ctx context.Context, searcher search.Searcher, reader index.IndexReader) error {
startTime := time.Now()
var err error
var next *search.DocumentMatch

backingSize := hc.size
backingSize := ec.size
if backingSize > PreAllocSizeSkipCap {
backingSize = PreAllocSizeSkipCap + 1
}
searchContext := &search.SearchContext{
DocumentMatchPool: search.NewDocumentMatchPool(backingSize+searcher.DocumentMatchPoolSize(), 0),
Collector: hc,
Collector: ec,
IndexReader: reader,
}

dmHandlerMaker := MakeEligibleDocumentMatchHandler
if cv := ctx.Value(search.MakeDocumentMatchHandlerKey); cv != nil {
dmHandlerMaker = cv.(search.MakeDocumentMatchHandler)
}
// use the application given builder for making the custom document match
// handler and perform callbacks/invocations on the newly made handler.
dmHandler, _, err := dmHandlerMaker(searchContext)
dmHandler, err := makeEligibleDocumentMatchHandler(searchContext)
if err != nil {
return err
}
Expand All @@ -80,15 +91,15 @@ func (hc *EligibleCollector) Collect(ctx context.Context, searcher search.Search
next, err = searcher.Next(searchContext)
}
for err == nil && next != nil {
if hc.total%CheckDoneEvery == 0 {
if ec.total%CheckDoneEvery == 0 {
select {
case <-ctx.Done():
search.RecordSearchCost(ctx, search.AbortM, 0)
return ctx.Err()
default:
}
}
hc.total++
ec.total++

err = dmHandler(next)
if err != nil {
Expand All @@ -109,48 +120,48 @@ func (hc *EligibleCollector) Collect(ctx context.Context, searcher search.Search
}

// compute search duration
hc.took = time.Since(startTime)
ec.took = time.Since(startTime)

// finalize actual results
err = hc.finalizeResults(reader)
err = ec.finalizeResults(reader)
if err != nil {
return err
}
return nil
}

func (hc *EligibleCollector) finalizeResults(r index.IndexReader) error {
func (ec *EligibleCollector) finalizeResults(r index.IndexReader) error {
var err error
hc.results, err = hc.store.Final(0, func(doc *search.DocumentMatch) error {
ec.results, err = ec.store.Final(0, func(doc *search.DocumentMatch) error {
// Adding the results to the store without any modifications since we don't
// require the external ID of the filtered hits.
return nil
})
return err
}

func (hc *EligibleCollector) Results() search.DocumentMatchCollection {
return hc.results
func (ec *EligibleCollector) Results() search.DocumentMatchCollection {
return ec.results
}

func (hc *EligibleCollector) Total() uint64 {
return hc.total
func (ec *EligibleCollector) Total() uint64 {
return ec.total
}

// No concept of scoring in the eligible collector.
func (hc *EligibleCollector) MaxScore() float64 {
func (ec *EligibleCollector) MaxScore() float64 {
return 0
}

func (hc *EligibleCollector) Took() time.Duration {
return hc.took
func (ec *EligibleCollector) Took() time.Duration {
return ec.took
}

func (hc *EligibleCollector) SetFacetsBuilder(facetsBuilder *search.FacetsBuilder) {
func (ec *EligibleCollector) SetFacetsBuilder(facetsBuilder *search.FacetsBuilder) {
// facet unsupported for pre-filtering in KNN search
}

func (hc *EligibleCollector) FacetResults() search.FacetResults {
func (ec *EligibleCollector) FacetResults() search.FacetResults {
// facet unsupported for pre-filtering in KNN search
return nil
}
21 changes: 0 additions & 21 deletions search/collector/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,27 +386,6 @@ func (hc *TopNCollector) prepareDocumentMatch(ctx *search.SearchContext,
return nil
}

// Unlike TopNDocHandler, this will not eliminate docs based on score.
func MakeEligibleDocumentMatchHandler(
ctx *search.SearchContext) (search.DocumentMatchHandler, bool, error) {

var hc *EligibleCollector
var ok bool

if hc, ok = ctx.Collector.(*EligibleCollector); ok {
return func(d *search.DocumentMatch) error {
if d == nil {
return nil
}

// No elements removed from the store here.
_ = hc.store.Add(d)
return nil
}, false, nil
}
return nil, false, nil
}

func MakeTopNDocumentMatchHandler(
ctx *search.SearchContext) (search.DocumentMatchHandler, bool, error) {
var hc *TopNCollector
Expand Down
10 changes: 6 additions & 4 deletions search_knn.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,11 +405,13 @@ func (i *indexImpl) runKnnCollector(ctx context.Context, req *SearchRequest, rea
return nil, err
}
filterHits := filterColl.Results()
filterHitsMap[idx] = make([]index.IndexInternalID, 0)
for _, docMatch := range filterHits {
filterHitsMap[idx] = append(filterHitsMap[idx], docMatch.IndexInternalID)
if len(filterHits) > 0 {
filterHitsMap[idx] = make([]index.IndexInternalID, len(filterHits))
for _, docMatch := range filterHits {
filterHitsMap[idx] = append(filterHitsMap[idx], docMatch.IndexInternalID)
}
requiresFiltering[idx] = true
}
requiresFiltering[idx] = true
}

// Add the filter hits when creating the kNN query
Expand Down

0 comments on commit fab6e1e

Please sign in to comment.