Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving rollback effectiveness #1776

Merged
merged 11 commits into from
Mar 2, 2023
222 changes: 214 additions & 8 deletions index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"math"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -516,6 +517,20 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
return nil, nil, err
}

// Storing the timestamp at which the current indexSnapshot
// was persisted, useful when you want to spread the
// numSnapshotsToKeep reasonably better than consecutive
// epochs.
currTimeStamp := time.Now()
timeStampBinary, err := currTimeStamp.MarshalText()
if err != nil {
return nil, nil, err
}
err = metaBucket.Put(boltMetaDataTimeStamp, timeStampBinary)
if err != nil {
return nil, nil, err
}

// persist internal values
internalBucket, err := snapshotBucket.CreateBucketIfNotExists(boltInternalKey)
if err != nil {
Expand Down Expand Up @@ -682,6 +697,7 @@ var boltInternalKey = []byte{'i'}
var boltMetaDataKey = []byte{'m'}
var boltMetaDataSegmentTypeKey = []byte("type")
var boltMetaDataSegmentVersionKey = []byte("version")
var boltMetaDataTimeStamp = []byte("timeStamp")

func (s *Scorch) loadFromBolt() error {
return s.rootBolt.View(func(tx *bolt.Tx) error {
Expand Down Expand Up @@ -871,30 +887,129 @@ func (s *Scorch) removeOldData() {
// rollback'ability.
var NumSnapshotsToKeep = 1

// RollbackSamplingInterval controls how far back we are looking
// in the history to get the rollback points.
// For example, a value of 10 minutes ensures that the
// protected snapshots (NumSnapshotsToKeep = 3) are:
//
// the very latest snapshot(ie the current one),
// the snapshot that was persisted 10 minutes before the current one,
// the snapshot that was persisted 20 minutes before the current one
//
// By default however, the timeseries way of protecting snapshots is
// disabled, and we protect the latest three contiguous snapshots
var RollbackSamplingInterval = 0 * time.Minute

// Controls what portion of the earlier rollback points to retain during
// a infrequent/sparse mutation scenario
var RollbackRetentionFactor = float64(0.5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the purpose of this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I've updated the description which clarifies this part


func getTimeSeriesSnapshots(maxDataPoints int, interval time.Duration,
snapshots []*snapshotMetaData) (int, map[uint64]time.Time) {
if interval == 0 {
return len(snapshots), map[uint64]time.Time{}
}
// the map containing the time series snapshots, i.e the timeseries of snapshots
// each of which is separated by rollbackSamplingInterval
rv := make(map[uint64]time.Time)
// the last point in the "time series", i.e. the timeseries of snapshots
// each of which is separated by rollbackSamplingInterval
ptr := len(snapshots) - 1
rv[snapshots[ptr].epoch] = snapshots[ptr].timeStamp
numSnapshotsProtected := 1

// traverse the list in reverse order, older timestamps to newer ones.
for i := ptr - 1; i >= 0; i-- {
// If we find a timeStamp which is the next datapoint in our
// timeseries of snapshots, and newer by RollbackSamplingInterval duration
// (comparison in terms of minutes), which is the interval of our time
// series. In this case, add the epoch rv
if snapshots[i].timeStamp.Sub(snapshots[ptr].timeStamp).Minutes() >
interval.Minutes() {
if _, ok := rv[snapshots[i+1].epoch]; !ok {
rv[snapshots[i+1].epoch] = snapshots[i+1].timeStamp
ptr = i + 1
numSnapshotsProtected++
}
} else if snapshots[i].timeStamp.Sub(snapshots[ptr].timeStamp).Minutes() ==
interval.Minutes() {
if _, ok := rv[snapshots[i].epoch]; !ok {
rv[snapshots[i].epoch] = snapshots[i].timeStamp
ptr = i
numSnapshotsProtected++
}
}

if numSnapshotsProtected >= maxDataPoints {
break
}
}
return ptr, rv
}

// getProtectedEpochs aims to fetch the epochs keep based on a timestamp basis.
// It tries to get NumSnapshotsToKeep snapshots, each of which are separated
// by a time duration of RollbackSamplingInterval.
func getProtectedSnapshots(rollbackSamplingInterval time.Duration,
numSnapshotsToKeep int,
persistedSnapshots []*snapshotMetaData) map[uint64]time.Time {

lastPoint, protectedEpochs := getTimeSeriesSnapshots(numSnapshotsToKeep,
rollbackSamplingInterval, persistedSnapshots)
if len(protectedEpochs) < numSnapshotsToKeep {
numSnapshotsNeeded := numSnapshotsToKeep - len(protectedEpochs)
// we protected the contiguous snapshots from the last point in time series
for i := 0; i < numSnapshotsNeeded && i < lastPoint; i++ {
protectedEpochs[persistedSnapshots[i].epoch] = persistedSnapshots[i].timeStamp
}
}

return protectedEpochs
}

func newCheckPoints(snapshots map[uint64]time.Time) []*snapshotMetaData {
rv := make([]*snapshotMetaData, 0)

keys := make([]uint64, 0, len(snapshots))
for k := range snapshots {
keys = append(keys, k)
}

sort.SliceStable(keys, func(i, j int) bool {
return snapshots[keys[i]].Sub(snapshots[keys[j]]) > 0
})

for _, key := range keys {
rv = append(rv, &snapshotMetaData{
epoch: key,
timeStamp: snapshots[key],
})
}

return rv
}

// Removes enough snapshots from the rootBolt so that the
// s.eligibleForRemoval stays under the NumSnapshotsToKeep policy.
func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) {
persistedEpochs, err := s.RootBoltSnapshotEpochs()
persistedSnapshots, err := s.rootBoltSnapshotMetaData()
if err != nil {
return 0, err
}

if len(persistedEpochs) <= s.numSnapshotsToKeep {
if len(persistedSnapshots) <= s.numSnapshotsToKeep {
// we need to keep everything
return 0, nil
}

// make a map of epochs to protect from deletion
protectedEpochs := make(map[uint64]struct{}, s.numSnapshotsToKeep)
for _, epoch := range persistedEpochs[0:s.numSnapshotsToKeep] {
protectedEpochs[epoch] = struct{}{}
}
protectedSnapshots := getProtectedSnapshots(s.rollbackSamplingInterval,
s.numSnapshotsToKeep, persistedSnapshots)

var epochsToRemove []uint64
var newEligible []uint64
s.rootLock.Lock()
for _, epoch := range s.eligibleForRemoval {
if _, ok := protectedEpochs[epoch]; ok {
if _, ok := protectedSnapshots[epoch]; ok {
// protected
newEligible = append(newEligible, epoch)
} else {
Expand All @@ -903,6 +1018,7 @@ func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) {
}
s.eligibleForRemoval = newEligible
s.rootLock.Unlock()
s.checkPoints = newCheckPoints(protectedSnapshots)

if len(epochsToRemove) == 0 {
return 0, nil
Expand Down Expand Up @@ -996,6 +1112,96 @@ func (s *Scorch) removeOldZapFiles() error {
return nil
}

// In sparse mutation scenario, it can so happen that all protected
// snapshots are older than the numSnapshotsToKeep * rollbackSamplingInterval
// duration. This results in all of them being purged from the boltDB
// and the next iteration of the removeOldData() would end up protecting
// latest contiguous snapshot which is a poor pattern in the rollback checkpoints.
// Hence we try to retain atleast retentionFactor portion worth of old snapshots
// in such a scenario using the following function
func getBoundaryCheckPoint(retentionFactor float64,
checkPoints []*snapshotMetaData, timeStamp time.Time) time.Time {
if checkPoints != nil {
boundary := checkPoints[int(math.Floor(float64(len(checkPoints))*
retentionFactor))]
if timeStamp.Sub(boundary.timeStamp) < 0 {
// too less checkPoints would be left.
return boundary.timeStamp
}
}
return timeStamp
}

type snapshotMetaData struct {
epoch uint64
timeStamp time.Time
}

func (s *Scorch) rootBoltSnapshotMetaData() ([]*snapshotMetaData, error) {
var rv []*snapshotMetaData
currTime := time.Now()
expirationDuration := time.Duration(s.numSnapshotsToKeep) * s.rollbackSamplingInterval

err := s.rootBolt.View(func(tx *bolt.Tx) error {
snapshots := tx.Bucket(boltSnapshotsBucket)
if snapshots == nil {
return nil
}
sc := snapshots.Cursor()
var found bool
for sk, _ := sc.Last(); sk != nil; sk, _ = sc.Prev() {
_, snapshotEpoch, err := decodeUvarintAscending(sk)
if err != nil {
continue
}

if expirationDuration == 0 {
rv = append(rv, &snapshotMetaData{
epoch: snapshotEpoch,
})
continue
}

snapshot := snapshots.Bucket(sk)
metaBucket := snapshot.Bucket(boltMetaDataKey)
if metaBucket == nil {
continue
}
timeStampBytes := metaBucket.Get(boltMetaDataTimeStamp)
var timeStamp time.Time
err = timeStamp.UnmarshalText(timeStampBytes)
if err != nil {
continue
}
// Don't keep snapshots older than
// expiration duration (numSnapshotsToKeep *
// rollbackSamplingInterval, by default)
if currTime.Sub(timeStamp) <= expirationDuration {
rv = append(rv, &snapshotMetaData{
epoch: snapshotEpoch,
timeStamp: timeStamp,
})
} else {
if !found {
found = true
boundary := getBoundaryCheckPoint(s.rollbackRetentionFactor,
s.checkPoints, timeStamp)
expirationDuration = currTime.Sub(boundary)
continue
}
k := encodeUvarintAscending(nil, snapshotEpoch)
err = snapshots.DeleteBucket(k)
if err == bolt.ErrBucketNotFound {
err = nil
}
}

}
return nil
})
return rv, err
}

func (s *Scorch) RootBoltSnapshotEpochs() ([]uint64, error) {
var rv []uint64
err := s.rootBolt.View(func(tx *bolt.Tx) error {
Expand Down
105 changes: 105 additions & 0 deletions index/scorch/rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package scorch

import (
"testing"
"time"

"github.com/blevesearch/bleve/v2/document"
index "github.com/blevesearch/bleve_index_api"
Expand Down Expand Up @@ -246,3 +247,107 @@ func TestIndexRollback(t *testing.T) {
t.Fatal(err)
}
}

func TestGetProtectedSnapshots(t *testing.T) {
origRollbackSamplingInterval := RollbackSamplingInterval
defer func() {
RollbackSamplingInterval = origRollbackSamplingInterval
}()
RollbackSamplingInterval = 10 * time.Minute
currentTimeStamp := time.Now()
tests := []struct {
title string
metaData []*snapshotMetaData
numSnapshotsToKeep int
expCount int
expEpochs []uint64
}{
{
title: "epochs that have exact timestamps as per expectation for protecting",
metaData: []*snapshotMetaData{
{epoch: 100, timeStamp: currentTimeStamp},
{epoch: 99, timeStamp: currentTimeStamp.Add(-(RollbackSamplingInterval / 12))},
{epoch: 88, timeStamp: currentTimeStamp.Add(-(RollbackSamplingInterval / 6))},
{epoch: 50, timeStamp: currentTimeStamp.Add(-(RollbackSamplingInterval))},
{epoch: 35, timeStamp: currentTimeStamp.Add(-(6 * RollbackSamplingInterval / 5))},
{epoch: 10, timeStamp: currentTimeStamp.Add(-(2 * RollbackSamplingInterval))},
},
numSnapshotsToKeep: 3,
expCount: 3,
expEpochs: []uint64{100, 50, 10},
},
{
title: "epochs that have exact timestamps as per expectation for protecting",
metaData: []*snapshotMetaData{
{epoch: 100, timeStamp: currentTimeStamp},
{epoch: 99, timeStamp: currentTimeStamp.Add(-(RollbackSamplingInterval / 12))},
{epoch: 88, timeStamp: currentTimeStamp.Add(-(RollbackSamplingInterval / 6))},
{epoch: 50, timeStamp: currentTimeStamp.Add(-(RollbackSamplingInterval))},
{epoch: 35, timeStamp: currentTimeStamp.Add(-(6 * RollbackSamplingInterval / 5))},
{epoch: 10, timeStamp: currentTimeStamp.Add(-(2 * RollbackSamplingInterval))},
},
numSnapshotsToKeep: 2,
expCount: 2,
expEpochs: []uint64{50, 10},
},
{
title: "epochs that have timestamps approximated to the expected value",
metaData: []*snapshotMetaData{
{epoch: 100, timeStamp: currentTimeStamp},
{epoch: 99, timeStamp: currentTimeStamp.Add(-(RollbackSamplingInterval / 12))},
{epoch: 88, timeStamp: currentTimeStamp.Add(-(RollbackSamplingInterval / 6))},
{epoch: 50, timeStamp: currentTimeStamp.Add(-(3 * RollbackSamplingInterval / 4))},
{epoch: 35, timeStamp: currentTimeStamp.Add(-(6 * RollbackSamplingInterval / 5))},
{epoch: 10, timeStamp: currentTimeStamp.Add(-(2 * RollbackSamplingInterval))},
},
numSnapshotsToKeep: 3,
expCount: 3,
expEpochs: []uint64{50, 35, 10},
},
{
title: "protecting epochs when we don't have enough snapshots with RollbackSamplingInterval" +
" separated timestamps",
metaData: []*snapshotMetaData{
{epoch: 100, timeStamp: currentTimeStamp},
{epoch: 99, timeStamp: currentTimeStamp.Add(-(RollbackSamplingInterval / 12))},
{epoch: 88, timeStamp: currentTimeStamp.Add(-(RollbackSamplingInterval / 6))},
{epoch: 50, timeStamp: currentTimeStamp.Add(-(3 * RollbackSamplingInterval / 4))},
{epoch: 35, timeStamp: currentTimeStamp.Add(-(5 * RollbackSamplingInterval / 6))},
{epoch: 10, timeStamp: currentTimeStamp.Add(-(7 * RollbackSamplingInterval / 8))},
},
numSnapshotsToKeep: 4,
expCount: 4,
expEpochs: []uint64{100, 99, 88, 10},
},
{
title: "epochs of which some are approximated to the expected timestamps, and" +
" we don't have enough snapshots with RollbackSamplingInterval separated timestamps",
metaData: []*snapshotMetaData{
{epoch: 100, timeStamp: currentTimeStamp},
{epoch: 99, timeStamp: currentTimeStamp.Add(-(RollbackSamplingInterval / 12))},
{epoch: 88, timeStamp: currentTimeStamp.Add(-(RollbackSamplingInterval / 6))},
{epoch: 50, timeStamp: currentTimeStamp.Add(-(3 * RollbackSamplingInterval / 4))},
{epoch: 35, timeStamp: currentTimeStamp.Add(-(8 * RollbackSamplingInterval / 7))},
{epoch: 10, timeStamp: currentTimeStamp.Add(-(6 * RollbackSamplingInterval / 5))},
},
numSnapshotsToKeep: 3,
expCount: 3,
expEpochs: []uint64{100, 50, 10},
},
}

for i, test := range tests {
protectedEpochs := getProtectedSnapshots(RollbackSamplingInterval,
test.numSnapshotsToKeep, test.metaData)
if len(protectedEpochs) != test.expCount {
t.Errorf("%d test: %s, getProtectedSnapshots expected to return %d "+
"snapshots, but got: %d", i, test.title, test.expCount, len(protectedEpochs))
}
for _, e := range test.expEpochs {
if _, found := protectedEpochs[e]; !found {
t.Errorf("%d test: %s, %d epoch expected to be protected, "+
"but missing from protected list: %v", i, test.title, e, protectedEpochs)
}
}
}
}
Loading