Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-19.1: batcheval: estimate stats in EvalAddSSTable #36525

Merged
merged 1 commit into from
Apr 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 46 additions & 35 deletions pkg/storage/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
)
Expand All @@ -47,37 +46,57 @@ func EvalAddSSTable(
// defer tracing.FinishSpan(span)
log.Eventf(ctx, "evaluating AddSSTable [%s,%s)", mvccStartKey.Key, mvccEndKey.Key)

// Compute the stats for any existing data in the affected span. The sstable
// being ingested can overwrite all, some, or none of the existing kvs.
// (Note: the expected case is that it's none or, in the case of a retry of
// the request, all.) So subtract out the existing mvcc stats, and add back
// what they'll be after the sstable is ingested.
existingIter := batch.NewIterator(engine.IterOptions{UpperBound: args.EndKey})
defer existingIter.Close()
existingIter.Seek(mvccStartKey)
if ok, err := existingIter.Valid(); err != nil {
return result.Result{}, errors.Wrap(err, "computing existing stats")
} else if ok && existingIter.UnsafeKey().Less(mvccEndKey) {
log.Eventf(ctx, "target key range not empty, will merge existing data with sstable")
}
// This ComputeStats is cheap if the span is empty.
existingStats, err := existingIter.ComputeStats(mvccStartKey, mvccEndKey, h.Timestamp.WallTime)
if err != nil {
return result.Result{}, errors.Wrap(err, "computing existing stats")
}
ms.Subtract(existingStats)
if log.V(2) {
log.Infof(ctx, "%s SST covers span containing %d existing keys: [%s, %s)", humanizeutil.IBytes(int64(len(args.Data))), existingStats.KeyCount, args.Key, args.EndKey)
}

// Verify that the keys in the sstable are within the range specified by the
// request header, verify the key-value checksums, and compute the new
// MVCCStats.
stats, err := verifySSTable(
existingIter, args.Data, mvccStartKey, mvccEndKey, h.Timestamp.WallTime)
args.Data, mvccStartKey, mvccEndKey, h.Timestamp.WallTime)
if err != nil {
return result.Result{}, errors.Wrap(err, "verifying sstable data")
}

// The above MVCCStats represents what is in this new SST.
//
// *If* the keys in the SST do not conflict with keys currently in this range,
// then adding the stats for this SST to the range stats should yield the
// correct overall stats.
//
// *However*, if the keys in this range *do* overlap with keys already in this
// range, then adding the SST semantically *replaces*, rather than adds, those
// keys, and the net effect on the stats is not so simple.
//
// To perfectly compute the correct net stats, you could a) determine the
// stats for the span of the existing range that this SST covers and subtract
// it from the range's stats, then b) use a merging iterator that reads from
// the SST and then underlying range and compute the stats of that merged
// span, and then add those stats back in. That would result in correct stats
// that reflect the merging semantics when the SST "shadows" an existing key.
//
// If the underlying range is mostly empty, this isn't terribly expensive --
// computing the existing stats to subtract is cheap, as there is little or no
// existing data to traverse and b) is also pretty cheap -- the merging
// iterator can quickly iterate the in-memory SST.
//
// However, if the underlying range is _not_ empty, then this is not cheap:
// recomputing its stats involves traversing lots of data, and iterating the
// merged iterator has to constantly go back and forth to the RocksDB-backed
// (cgo) iterator.
//
// If we assume that most SSTs don't shadow too many keys, then the error of
// simply adding the SST stats directly to the range stats is minimal. In the
// worst-case, when we retry a whole SST, then it could be overcounting the
// entire file, but we can hope that that is rare. In the worst case, it may
// cause splitting an under-filled range that would later merge when the
// over-count is fixed.
//
// We can indicate that these stats contain this estimation using the flag in
// the MVCC stats so that later re-computations will not be surprised to find
// any discrepancies.
//
// Callers can trigger such a re-computation to fixup any discrepancies (and
// remove the ContainsEstimates flag) after they are done ingesting files by
// sending an explicit recompute.
stats.ContainsEstimates = true
ms.Add(stats)

return result.Result{
Expand All @@ -91,7 +110,7 @@ func EvalAddSSTable(
}

func verifySSTable(
existingIter engine.SimpleIterator, data []byte, start, end engine.MVCCKey, nowNanos int64,
data []byte, start, end engine.MVCCKey, nowNanos int64,
) (enginepb.MVCCStats, error) {
// To verify every KV is a valid roachpb.KeyValue in the range [start, end)
// we a) pass a verify flag on the iterator so that as ComputeStatsGo calls
Expand All @@ -115,15 +134,7 @@ func verifySSTable(
}
}

// In the case that two iterators have an entry with the same key and
// timestamp, MultiIterator breaks ties by preferring later ones in the
// ordering. So it's important that the sstable iterator comes after the one
// for the existing data (because the sstable will overwrite it when
// ingested).
mergedIter := engine.MakeMultiIterator([]engine.SimpleIterator{existingIter, dataIter})
defer mergedIter.Close()

stats, err := engine.ComputeStatsGo(mergedIter, start, end, nowNanos)
stats, err := engine.ComputeStatsGo(dataIter, start, end, nowNanos)
if err != nil {
return stats, err
}
Expand Down
Loading