Skip to content

Commit

Permalink
Correcting bytes read value with respect to metadata loading (#1769)
Browse files Browse the repository at this point in the history
* introducing flags to avoid double counting

* minor code cleanup

* fixed a data race problem
 - because there is a method receiving struct by value rather
   than reference. this causes a read/write problem just because
   pass by value causes a read on the shared variable when there
   are writes elsewhere.

* updating tests to reflect the metadata accounting

* changed int types for vars
  • Loading branch information
Thejas-bhat authored Dec 15, 2022
1 parent a1f2b3e commit 37ed84b
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 9 deletions.
3 changes: 3 additions & 0 deletions index/scorch/introducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func (s *Scorch) introducePersist(persist *persistIntroduction) {
deleted: segmentSnapshot.deleted,
cachedDocs: segmentSnapshot.cachedDocs,
creator: "introducePersist",
mmaped: 1,
}
newIndexSnapshot.segment[i] = newSegmentSnapshot
delete(persist.persisted, segmentSnapshot.id)
Expand Down Expand Up @@ -413,13 +414,15 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
// deleted by the time we reach here, can skip the introduction.
if nextMerge.new != nil &&
nextMerge.new.Count() > newSegmentDeleted.GetCardinality() {

// put new segment at end
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
id: nextMerge.id,
segment: nextMerge.new, // take ownership for nextMerge.new's ref-count
deleted: newSegmentDeleted,
cachedDocs: &cachedDocs{cache: nil},
creator: "introduceMerge",
mmaped: nextMerge.mmaped,
})
newSnapshot.offsets = append(newSnapshot.offsets, running)
atomic.AddUint64(&s.stats.TotIntroducedSegmentsMerge, 1)
Expand Down
2 changes: 2 additions & 0 deletions index/scorch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
oldNewDocNums: oldNewDocNums,
new: seg,
notifyCh: make(chan *mergeTaskIntroStatus),
mmaped: 1,
}

s.fireEvent(EventKindMergeTaskIntroductionStart, 0)
Expand Down Expand Up @@ -429,6 +430,7 @@ type segmentMerge struct {
oldNewDocNums map[uint64][]uint64
new segment.Segment
notifyCh chan *mergeTaskIntroStatus
mmaped uint32
}

func cumulateBytesRead(sbs []segment.Segment) uint64 {
Expand Down
11 changes: 9 additions & 2 deletions index/scorch/snapshot_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,15 @@ func (is *IndexSnapshot) TermFieldReader(ctx context.Context, term []byte, field
if rv.dicts == nil {
rv.dicts = make([]segment.TermDictionary, len(is.segment))
for i, s := range is.segment {
segBytesRead := s.segment.BytesRead()
rv.incrementBytesRead(segBytesRead)
// the intention behind this compare and swap operation is
// to make sure that the accounting of the metadata is happening
// only once(which corresponds to this persisted segment's most
// recent segPlugin.Open() call), and any subsequent queries won't
// incur this cost which would essentially be a double counting.
if atomic.CompareAndSwapUint32(&s.mmaped, 1, 0) {
segBytesRead := s.segment.BytesRead()
rv.incrementBytesRead(segBytesRead)
}
dict, err := s.segment.Dictionary(field)
if err != nil {
return nil, err
Expand Down
7 changes: 6 additions & 1 deletion index/scorch/snapshot_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ var TermSeparator byte = 0xff
var TermSeparatorSplitSlice = []byte{TermSeparator}

type SegmentSnapshot struct {
// this flag is needed to identify whether this
// segment was mmaped recently, in which case
// we consider the loading cost of the metadata
// as part of IO stats.
mmaped uint32
id uint64
segment segment.Segment
deleted *roaring.Bitmap
Expand All @@ -54,7 +59,7 @@ func (s *SegmentSnapshot) FullSize() int64 {
return int64(s.segment.Count())
}

func (s SegmentSnapshot) LiveSize() int64 {
func (s *SegmentSnapshot) LiveSize() int64 {
return int64(s.Count())
}

Expand Down
12 changes: 6 additions & 6 deletions index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ func TestBytesRead(t *testing.T) {
}
stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 206545 && res.BytesRead == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for fuzzy query is 206545, got %v",
if bytesRead-prevBytesRead != 8468 && res.BytesRead == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for fuzzy query is 8468, got %v",
bytesRead-prevBytesRead)
}
prevBytesRead = bytesRead
Expand Down Expand Up @@ -466,8 +466,8 @@ func TestBytesRead(t *testing.T) {

stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 54945 && res.BytesRead == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for numeric range query is 54945, got %v",
if bytesRead-prevBytesRead != 924 && res.BytesRead == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for numeric range query is 924, got %v",
bytesRead-prevBytesRead)
}
prevBytesRead = bytesRead
Expand Down Expand Up @@ -498,8 +498,8 @@ func TestBytesRead(t *testing.T) {
// since it's created afresh and not reused
stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 18090 && res.BytesRead == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for disjunction query is 18090, got %v",
if bytesRead-prevBytesRead != 83 && res.BytesRead == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for disjunction query is 83, got %v",
bytesRead-prevBytesRead)
}
}
Expand Down

0 comments on commit 37ed84b

Please sign in to comment.