Skip to content

Commit

Permalink
db: add DB.Excise
Browse files Browse the repository at this point in the history
Add an Excise method to the DB allowing an excise without an accompanying
ingestion. This will be used in CockroachDB to recover from corruption by
excising out corrupted sstables.

For now, this implementation uses DB.ingest. I think a future refactor should
be able to clean these code paths up and potentially refactor excise to be
independent of the ingest codepath. Today, it's necessary to reuse the logic to
perform excises as 'flushable ingests.'

Closes #4286.
  • Loading branch information
jbowens committed Mar 4, 2025
1 parent 5b12eaf commit d8cf203
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 83 deletions.
2 changes: 1 addition & 1 deletion checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func testCheckpointImpl(t *testing.T, ddFile string, createOnShared bool) {

// Hacky but the command doesn't expect a db string. Get rid of it.
td.CmdArgs = td.CmdArgs[1:]
if err := runIngestAndExciseCmd(td, d, mem); err != nil {
if err := runIngestAndExciseCmd(td, d); err != nil {
return err.Error()
}
return ""
Expand Down
2 changes: 1 addition & 1 deletion compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1386,7 +1386,7 @@ func TestCompactionPickerPickFile(t *testing.T) {
return ""

case "ingest-and-excise":
if err := runIngestAndExciseCmd(td, d, d.opts.FS); err != nil {
if err := runIngestAndExciseCmd(td, d); err != nil {
return err.Error()
}
return ""
Expand Down
13 changes: 12 additions & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1252,7 +1252,18 @@ func (d *DB) waitTableStats() {
}
}

func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
func runExciseCmd(td *datadriven.TestData, d *DB) error {
if len(td.CmdArgs) != 2 {
return errors.New("excise expects two arguments: <start-key> <end-key>")
}
exciseSpan := KeyRange{
Start: []byte(td.CmdArgs[0].String()),
End: []byte(td.CmdArgs[1].String()),
}
return d.Excise(context.Background(), exciseSpan)
}

func runIngestAndExciseCmd(td *datadriven.TestData, d *DB) error {
var exciseSpan KeyRange
paths := make([]string, 0, len(td.CmdArgs))
for i, arg := range td.CmdArgs {
Expand Down
9 changes: 8 additions & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2198,6 +2198,13 @@ func TestDeterminism(t *testing.T) {
require.NoError(t, runBuildCmd(td, d, fs))
return ""
})
case "excise":
return addStep(td, func(td *datadriven.TestData) string {
if err := runExciseCmd(td, d); err != nil {
return err.Error()
}
return ""
})
case "flush":
return addStep(td, func(td *datadriven.TestData) string {
_, err := d.AsyncFlush()
Expand All @@ -2208,7 +2215,7 @@ func TestDeterminism(t *testing.T) {
})
case "ingest-and-excise":
return addStep(td, func(td *datadriven.TestData) string {
if err := runIngestAndExciseCmd(td, d, fs); err != nil {
if err := runIngestAndExciseCmd(td, d); err != nil {
return err.Error()
}
return ""
Expand Down
31 changes: 31 additions & 0 deletions excise.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,42 @@ import (
"context"
"slices"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/manifest"
)

// Excise atomically deletes all data overlapping with the provided span. All
// data overlapping with the span is removed, including from open snapshots.
// Only currently-open iterators will still observe the removed data. Excise may
// initiate a flush if there exists unflushed data overlapping the excise span.
func (d *DB) Excise(ctx context.Context, span KeyRange) error {
if err := d.closed.Load(); err != nil {
panic(err)
}
if d.opts.ReadOnly {
return ErrReadOnly
}
if invariants.Enabled {
// Excise is only supported on prefix keys.
if d.opts.Comparer.Split(span.Start) != len(span.Start) {
panic("IngestAndExcise called with suffixed start key")
}
if d.opts.Comparer.Split(span.End) != len(span.End) {
panic("IngestAndExcise called with suffixed end key")
}
}
if v := d.FormatMajorVersion(); v < FormatVirtualSSTables {
return errors.Errorf(
"store has format major version %d; Excise requires at least %d",
v, FormatVirtualSSTables,
)
}
_, err := d.ingest(ctx, nil, nil, span, nil)
return err
}

// excise updates ve to include a replacement of the file m with new virtual
// sstables that exclude exciseSpan, returning a slice of newly-created files if
// any. If the entirety of m is deleted by exciseSpan, no new sstables are added
Expand Down
107 changes: 56 additions & 51 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1466,7 +1466,7 @@ func (d *DB) ingest(
return IngestOperationStats{}, err
}

if loadResult.fileCount() == 0 {
if loadResult.fileCount() == 0 && !exciseSpan.Valid() {
// All of the sstables to be ingested were empty. Nothing to do.
return IngestOperationStats{}, nil
}
Expand Down Expand Up @@ -1713,61 +1713,66 @@ func (d *DB) ingest(
}
}

info := TableIngestInfo{
JobID: int(jobID),
Err: err,
flushable: asFlushable,
}
if len(loadResult.local) > 0 {
info.GlobalSeqNum = loadResult.local[0].SmallestSeqNum
} else if len(loadResult.shared) > 0 {
info.GlobalSeqNum = loadResult.shared[0].SmallestSeqNum
} else {
info.GlobalSeqNum = loadResult.external[0].SmallestSeqNum
}
// TODO(jackson): Refactor this so that the case where there are no files
// but a valid excise span is not so exceptional.

var stats IngestOperationStats
if ve != nil {
info.Tables = make([]struct {
TableInfo
Level int
}, len(ve.NewTables))
for i := range ve.NewTables {
e := &ve.NewTables[i]
info.Tables[i].Level = e.Level
info.Tables[i].TableInfo = e.Meta.TableInfo()
stats.Bytes += e.Meta.Size
if e.Level == 0 {
stats.ApproxIngestedIntoL0Bytes += e.Meta.Size
}
if metaFlushableOverlaps[e.Meta.FileNum] {
stats.MemtableOverlappingFiles++
if loadResult.fileCount() > 0 {
info := TableIngestInfo{
JobID: int(jobID),
Err: err,
flushable: asFlushable,
}
if len(loadResult.local) > 0 {
info.GlobalSeqNum = loadResult.local[0].SmallestSeqNum
} else if len(loadResult.shared) > 0 {
info.GlobalSeqNum = loadResult.shared[0].SmallestSeqNum
} else {
info.GlobalSeqNum = loadResult.external[0].SmallestSeqNum
}
if ve != nil {
info.Tables = make([]struct {
TableInfo
Level int
}, len(ve.NewTables))
for i := range ve.NewTables {
e := &ve.NewTables[i]
info.Tables[i].Level = e.Level
info.Tables[i].TableInfo = e.Meta.TableInfo()
stats.Bytes += e.Meta.Size
if e.Level == 0 {
stats.ApproxIngestedIntoL0Bytes += e.Meta.Size
}
if metaFlushableOverlaps[e.Meta.FileNum] {
stats.MemtableOverlappingFiles++
}
}
}
} else if asFlushable {
// NB: If asFlushable == true, there are no shared sstables.
info.Tables = make([]struct {
TableInfo
Level int
}, len(loadResult.local))
for i, f := range loadResult.local {
info.Tables[i].Level = -1
info.Tables[i].TableInfo = f.TableInfo()
stats.Bytes += f.Size
// We don't have exact stats on which files will be ingested into
// L0, because actual ingestion into the LSM has been deferred until
// flush time. Instead, we infer based on memtable overlap.
//
// TODO(jackson): If we optimistically compute data overlap (#2112)
// before entering the commit pipeline, we can use that overlap to
// improve our approximation by incorporating overlap with L0, not
// just memtables.
if metaFlushableOverlaps[f.FileNum] {
stats.ApproxIngestedIntoL0Bytes += f.Size
stats.MemtableOverlappingFiles++
} else if asFlushable {
// NB: If asFlushable == true, there are no shared sstables.
info.Tables = make([]struct {
TableInfo
Level int
}, len(loadResult.local))
for i, f := range loadResult.local {
info.Tables[i].Level = -1
info.Tables[i].TableInfo = f.TableInfo()
stats.Bytes += f.Size
// We don't have exact stats on which files will be ingested into
// L0, because actual ingestion into the LSM has been deferred until
// flush time. Instead, we infer based on memtable overlap.
//
// TODO(jackson): If we optimistically compute data overlap (#2112)
// before entering the commit pipeline, we can use that overlap to
// improve our approximation by incorporating overlap with L0, not
// just memtables.
if metaFlushableOverlaps[f.FileNum] {
stats.ApproxIngestedIntoL0Bytes += f.Size
stats.MemtableOverlappingFiles++
}
}
}
d.opts.EventListener.TableIngested(info)
}
d.opts.EventListener.TableIngested(info)

return stats, err
}
Expand Down
24 changes: 8 additions & 16 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,25 +721,21 @@ func TestExcise(t *testing.T) {
return err.Error()
}
return ""

case "flush":
if err := d.Flush(); err != nil {
return err.Error()
}
return ""

case "block-flush":
d.mu.Lock()
d.mu.compact.flushing = true
d.mu.Unlock()
return ""

case "allow-flush":
d.mu.Lock()
d.mu.compact.flushing = false
d.mu.Unlock()
return ""

case "ingest":
noWait := td.HasArg("no-wait")
if !noWait {
Expand All @@ -761,7 +757,6 @@ func TestExcise(t *testing.T) {
return "memtable flushed"
}
return ""

case "ingest-and-excise":
var prevFlushableIngests uint64
noWait := td.HasArg("no-wait")
Expand All @@ -772,7 +767,7 @@ func TestExcise(t *testing.T) {
d.mu.Unlock()
}

if err := runIngestAndExciseCmd(td, d, mem); err != nil {
if err := runIngestAndExciseCmd(td, d); err != nil {
return err.Error()
}
if noWait {
Expand All @@ -792,7 +787,6 @@ func TestExcise(t *testing.T) {
return "memtable flushed"
}
return ""

case "file-only-snapshot":
if len(td.CmdArgs) != 1 {
panic("insufficient args for file-only-snapshot command")
Expand All @@ -811,10 +805,8 @@ func TestExcise(t *testing.T) {
s := d.NewEventuallyFileOnlySnapshot(keyRanges)
efos[name] = s
return "ok"

case "get":
return runGetCmd(t, td, d)

case "iter":
opts := &IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
Expand Down Expand Up @@ -849,10 +841,8 @@ func TestExcise(t *testing.T) {
}
iter, _ := reader.NewIter(opts)
return runIterCmd(td, iter, true)

case "lsm":
return runLSMCmd(td, d)

case "metrics":
// The asynchronous loading of table stats can change metrics, so
// wait for all the tables' stats to be loaded.
Expand All @@ -861,11 +851,14 @@ func TestExcise(t *testing.T) {
d.mu.Unlock()

return d.Metrics().StringForTests()

case "wait-pending-table-stats":
return runTableStatsCmd(td, d)

case "excise":
if err := runExciseCmd(td, d); err != nil {
return err.Error()
}
return ""
case "excise-dryrun":
ve := &versionEdit{
DeletedTables: map[deletedFileEntry]*tableMetadata{},
}
Expand Down Expand Up @@ -894,7 +887,6 @@ func TestExcise(t *testing.T) {
d.mu.versions.logUnlock()
d.mu.Unlock()
return fmt.Sprintf("would excise %d files, use ingest-and-excise to excise.\n%s", len(ve.DeletedTables), ve.DebugString(base.DefaultFormatter))

case "confirm-backing":
// Confirms that the files have the same FileBacking.
fileNums := make(map[base.FileNum]struct{})
Expand Down Expand Up @@ -1097,7 +1089,7 @@ func testIngestSharedImpl(
return ""

case "ingest-and-excise":
if err := runIngestAndExciseCmd(td, d, d.opts.FS); err != nil {
if err := runIngestAndExciseCmd(td, d); err != nil {
return err.Error()
}
// Wait for a possible flush.
Expand Down Expand Up @@ -1595,7 +1587,7 @@ func TestConcurrentExcise(t *testing.T) {
d.mu.Lock()
prevFlushableIngests := d.mu.versions.metrics.Flush.AsIngestCount
d.mu.Unlock()
if err := runIngestAndExciseCmd(td, d, d.opts.FS); err != nil {
if err := runIngestAndExciseCmd(td, d); err != nil {
return err.Error()
}
// Wait for a possible flush.
Expand Down
2 changes: 1 addition & 1 deletion metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func TestMetrics(t *testing.T) {
return s

case "ingest-and-excise":
if err := runIngestAndExciseCmd(td, d, d.opts.FS); err != nil {
if err := runIngestAndExciseCmd(td, d); err != nil {
return err.Error()
}
return ""
Expand Down
2 changes: 1 addition & 1 deletion table_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestTableStats(t *testing.T) {
return ""

case "ingest-and-excise":
if err := runIngestAndExciseCmd(td, d, d.opts.FS); err != nil {
if err := runIngestAndExciseCmd(td, d); err != nil {
return err.Error()
}
// Wait for a possible flush.
Expand Down
9 changes: 9 additions & 0 deletions testdata/determinism
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,12 @@ run count=100
sequential( 0:define 1:memtable-info 2:batch 3:batch 4:memtable-info parallel( 5:batch 6:batch ) )
----
ok

excise a z
----
7:excise

run count=100
sequential( 0:define 1:memtable-info 2:batch 3:batch 4:memtable-info parallel( 5:batch 6:batch ) 7:excise )
----
ok
Loading

0 comments on commit d8cf203

Please sign in to comment.