Skip to content

Commit d5d3d1a

Browse files
craig[bot]tbgbenesch
committed
Merge #29646 #29677
29646: roachtest: better merge testing in clearrange r=benesch a=tschottdorf Unfortunately, the method to determine the range count is quite slow since crdb_internal.ranges internally sends an RPC for each range to determine the leaseholder. Anecdotally, I've seen ~25% of the merges completed after less than 15 minutes. I know that it's slowing down over time, but @benesch will fix that. Also throws in aggressive consistency checks so that when something goes out of sync, we find out right there. Release note: None 29677: storage: preserve consistency when applying widening preemptive snapshots r=benesch a=benesch Merges can cause preemptive snapshots that widen existing replicas. For example, consider the following sequence of events: 1. A replica of range A is removed from store S, but is not garbage collected. 2. Range A subsumes its right neighbor B. 3. Range A is re-added to store S. In step 3, S will receive a preemptive snapshot for A that requires widening its existing replica, thanks to the intervening merge. Problematically, the code to check whether this widening was possible, in Store.canApplySnapshotLocked, was incorrectly mutating the range descriptor in the snapshot header! Applying the snapshot would then fail to clear all of the data from the old incarnation of the replica, since the bounds on the range deletion tombstone were wrong. This often resulted in replica inconsistency. Plus, the in-memory copy of the range descriptor would be incorrect until the next descriptor update--though this usually happened quickly, as the replica would apply the change replicas command, which updates the descriptor, soon after applying the preemptive snapshot. To fix the problem, teach Store.canApplySnapshotLocked to make a copy of the range descriptor before it mutates it. To prevent regressions, add an assertion that a range's start key is never changed to the descriptor update path. With this assertion in place, but without the fix itself, TestStoreRangeMergeReadoptedLHSFollower reliably fails. Fixes #29252. Release note: None Co-authored-by: Tobias Schottdorf <[email protected]> Co-authored-by: Nikhil Benesch <[email protected]>
3 parents 922422c + 5bd9941 + 3ced421 commit d5d3d1a

7 files changed

+92
-20
lines changed

pkg/cmd/roachtest/clearrange.go

+68-17
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
)
2424

2525
func registerClearRange(r *registry) {
26+
const aggressiveConsistencyChecks = true
27+
2628
r.Add(testSpec{
2729
Name: `clearrange`,
2830
MinVersion: `v2.1.0`,
@@ -52,7 +54,13 @@ func registerClearRange(r *registry) {
5254
}
5355

5456
c.Put(ctx, cockroach, "./cockroach")
55-
c.Start(ctx)
57+
if aggressiveConsistencyChecks {
58+
// Run with an env var that runs a synchronous consistency check after each rebalance and merge.
59+
// This slows down merges, so it might hide some races.
60+
c.Start(ctx, startArgs("--env=COCKROACH_CONSISTENCY_AGGRESSIVE=true"))
61+
} else {
62+
c.Start(ctx)
63+
}
5664

5765
// Also restore a much smaller table. We'll use it to run queries against
5866
// the cluster after having dropped the large table above, verifying that
@@ -68,6 +76,39 @@ func registerClearRange(r *registry) {
6876

6977
t.Status()
7078

79+
// Set up a convenience function that we can call to learn the number of
80+
// ranges for the bank.bank table (even after it's been dropped).
81+
numBankRanges := func() func() int {
82+
conn := c.Conn(ctx, 1)
83+
defer conn.Close()
84+
85+
var startHex string
86+
// NB: set this to false to save yourself some time during development. Selecting
87+
// from crdb_internal.ranges is very slow because it contacts all of the leaseholders.
88+
// You may actually want to run a version of cockroach that doesn't do that because
89+
// it'll still slow you down every time the method returned below is called.
90+
if true {
91+
if err := conn.QueryRow(
92+
`SELECT to_hex(start_key) FROM crdb_internal.ranges WHERE "database" = 'bank' AND "table" = 'bank' ORDER BY start_key ASC LIMIT 1`,
93+
).Scan(&startHex); err != nil {
94+
t.Fatal(err)
95+
}
96+
} else {
97+
startHex = "bd" // extremely likely to be the right thing (b'\275').
98+
}
99+
return func() int {
100+
conn := c.Conn(ctx, 1)
101+
defer conn.Close()
102+
var n int
103+
if err := conn.QueryRow(
104+
`SELECT count(*) FROM crdb_internal.ranges WHERE substr(to_hex(start_key), 1, length($1::string)) = $1`, startHex,
105+
).Scan(&n); err != nil {
106+
t.Fatal(err)
107+
}
108+
return n
109+
}
110+
}()
111+
71112
m := newMonitor(ctx, c)
72113
m.Go(func(ctx context.Context) error {
73114
conn := c.Conn(ctx, 1)
@@ -77,6 +118,11 @@ func registerClearRange(r *registry) {
77118
return err
78119
}
79120

121+
// Merge as fast as possible to put maximum stress on the system.
122+
if _, err := conn.ExecContext(ctx, `SET CLUSTER SETTING kv.range_merge.queue_interval = '0s'`); err != nil {
123+
return err
124+
}
125+
80126
t.WorkerStatus("dropping table")
81127
defer t.WorkerStatus()
82128

@@ -86,41 +132,46 @@ func registerClearRange(r *registry) {
86132
return err
87133
}
88134

135+
t.WorkerStatus("computing number of ranges")
136+
initialBankRanges := numBankRanges()
137+
138+
t.WorkerStatus("dropping bank table")
89139
if _, err := conn.ExecContext(ctx, `DROP TABLE bank.bank`); err != nil {
90140
return err
91141
}
92142

93-
// Spend a few minutes reading data with a timeout to make sure the
143+
// Spend some time reading data with a timeout to make sure the
94144
// DROP above didn't brick the cluster. At the time of writing,
95-
// clearing all of the table data takes ~6min. We run for 2.5x that
96-
// time to verify that nothing has gone wonky on the cluster.
97-
//
98-
// Don't lower this number, or the test may pass erroneously.
99-
const minutes = 45
100-
t.WorkerStatus("repeatedly running count(*) on small table")
101-
for i := 0; i < minutes; i++ {
102-
after := time.After(time.Minute)
145+
// clearing all of the table data takes ~6min, so we want to run
146+
// for at least a multiple of that duration.
147+
const minDuration = 45 * time.Minute
148+
deadline := timeutil.Now().Add(minDuration)
149+
curBankRanges := numBankRanges()
150+
t.WorkerStatus("waiting for ~", curBankRanges, " merges to complete (and for at least ", minDuration, " to pass)")
151+
for timeutil.Now().Before(deadline) || curBankRanges > 1 {
152+
after := time.After(5 * time.Minute)
153+
curBankRanges = numBankRanges() // this call takes minutes, unfortunately
154+
t.WorkerProgress(1 - float64(curBankRanges)/float64(initialBankRanges))
155+
103156
var count int
104157
// NB: context cancellation in QueryRowContext does not work as expected.
105158
// See #25435.
106-
if _, err := conn.ExecContext(ctx, `SET statement_timeout = '10s'`); err != nil {
159+
if _, err := conn.ExecContext(ctx, `SET statement_timeout = '5s'`); err != nil {
107160
return err
108161
}
109-
// If we can't aggregate over 80kb in 10s, the database is far from usable.
110-
start := timeutil.Now()
162+
// If we can't aggregate over 80kb in 5s, the database is far from usable.
111163
if err := conn.QueryRowContext(ctx, `SELECT count(*) FROM tinybank.bank`).Scan(&count); err != nil {
112164
return err
113165
}
114-
c.l.Printf("read %d rows in %0.1fs\n", count, timeutil.Since(start).Seconds())
115-
t.WorkerProgress(float64(i+1) / float64(minutes))
166+
116167
select {
117168
case <-after:
118169
case <-ctx.Done():
119170
return ctx.Err()
120171
}
121172
}
122-
// TODO(benesch): verify that every last range in the table has been
123-
// merged away. For now, just exercising the merge code is a good start.
173+
// TODO(tschottdorf): verify that disk space usage drops below to <some small amount>, but that
174+
// may not actually happen (see https://github.com/cockroachdb/cockroach/issues/29290).
124175
return nil
125176
})
126177
m.Wait()

pkg/storage/consistency_queue.go

+3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/cockroachdb/cockroach/pkg/gossip"
2323
"github.com/cockroachdb/cockroach/pkg/roachpb"
2424
"github.com/cockroachdb/cockroach/pkg/settings"
25+
"github.com/cockroachdb/cockroach/pkg/util/envutil"
2526
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
2627
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2728
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -33,6 +34,8 @@ var consistencyCheckInterval = settings.RegisterNonNegativeDurationSetting(
3334
24*time.Hour,
3435
)
3536

37+
var testingAggressiveConsistencyChecks = envutil.EnvOrDefaultBool("COCKROACH_CONSISTENCY_AGGRESSIVE", false)
38+
3639
type consistencyQueue struct {
3740
*baseQueue
3841
interval func() time.Duration

pkg/storage/merge_queue.go

+5
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,11 @@ func (mq *mergeQueue) process(
273273
// as purgatory-worthy.
274274
return rangeMergePurgatoryError{err}
275275
}
276+
if testingAggressiveConsistencyChecks {
277+
if err := mq.store.consistencyQueue.process(ctx, lhsRepl, sysCfg); err != nil {
278+
log.Warning(ctx, err)
279+
}
280+
}
276281
return nil
277282
}
278283

pkg/storage/replica.go

+5
Original file line numberDiff line numberDiff line change
@@ -1717,6 +1717,11 @@ func (r *Replica) setDescWithoutProcessUpdate(desc *roachpb.RangeDescriptor) {
17171717
log.Fatalf(ctx, "cannot replace initialized descriptor with uninitialized one: %+v -> %+v",
17181718
r.mu.state.Desc, desc)
17191719
}
1720+
if r.mu.state.Desc != nil && !r.mu.state.Desc.StartKey.Equal(desc.StartKey) {
1721+
ctx := r.AnnotateCtx(context.TODO())
1722+
log.Fatalf(ctx, "attempted to change replica's start key from %s to %s",
1723+
r.mu.state.Desc.StartKey, desc.StartKey)
1724+
}
17201725

17211726
newMaxID := maxReplicaID(desc)
17221727
if newMaxID > r.mu.lastReplicaAdded {

pkg/storage/replica_raftstorage.go

+3
Original file line numberDiff line numberDiff line change
@@ -945,6 +945,9 @@ func (r *Replica) applySnapshot(
945945
// Update the range and store stats.
946946
r.store.metrics.subtractMVCCStats(*r.mu.state.Stats)
947947
r.store.metrics.addMVCCStats(*s.Stats)
948+
// TODO(benesch): the next line updates r.mu.state.Desc, but that's supposed
949+
// to be handled by the call to setDescWithoutProcessUpdate below. This is not
950+
// a correctness issue right now, but it's liable to become one.
948951
r.mu.state = s
949952
r.assertStateLocked(ctx, r.store.Engine())
950953
r.mu.Unlock()

pkg/storage/replicate_queue.go

+5
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,11 @@ func (rq *replicateQueue) process(
265265
// Enqueue this replica again to see if there are more changes to be made.
266266
rq.MaybeAdd(repl, rq.store.Clock().Now())
267267
}
268+
if testingAggressiveConsistencyChecks {
269+
if err := rq.store.consistencyQueue.process(ctx, repl, sysCfg); err != nil {
270+
log.Warning(ctx, err)
271+
}
272+
}
268273
return nil
269274
}
270275
return errors.Errorf("failed to replicate after %d retries", retryOpts.MaxRetries)

pkg/storage/store_snapshot.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ func (s *Store) canApplySnapshot(
394394
func (s *Store) canApplySnapshotLocked(
395395
ctx context.Context, snapHeader *SnapshotRequest_Header,
396396
) (*ReplicaPlaceholder, error) {
397-
desc := snapHeader.State.Desc
397+
desc := *snapHeader.State.Desc
398398
if v, ok := s.mu.replicas.Load(int64(desc.RangeID)); ok && (*Replica)(v).IsInitialized() {
399399
// We have an initialized replica. Preemptive snapshots can be applied with
400400
// no further checks if they do not widen the existing replica. Raft
@@ -420,7 +420,7 @@ func (s *Store) canApplySnapshotLocked(
420420

421421
// TODO(benesch): consider discovering and GC'ing *all* overlapping ranges,
422422
// not just the first one that getOverlappingKeyRangeLocked happens to return.
423-
if exRange := s.getOverlappingKeyRangeLocked(desc); exRange != nil {
423+
if exRange := s.getOverlappingKeyRangeLocked(&desc); exRange != nil {
424424
// We have a conflicting range, so we must block the snapshot.
425425
// When such a conflict exists, it will be resolved by one range
426426
// either being split or garbage collected.
@@ -454,7 +454,7 @@ func (s *Store) canApplySnapshotLocked(
454454
}
455455

456456
placeholder := &ReplicaPlaceholder{
457-
rangeDesc: *desc,
457+
rangeDesc: desc,
458458
}
459459
return placeholder, nil
460460
}

0 commit comments

Comments
 (0)