Skip to content

Commit

Permalink
storage: add permitLargeSnapshots flag to replica
Browse files Browse the repository at this point in the history
In a privately reported user issue, we've seen that [our attempts](#7788)
at [preventing large snapshots](#7581)
can result in replica unavailability. Our current approach to limiting
large snapshots assumes is that its ok to block snapshots indefinitely
while waiting for a range to first split. Unfortunately, this can create
a dependency cycle where a range requires a snapshot to split (because it
can't achieve an up-to-date quorum without it) but isn't allowed to perform
a snapshot until its size is reduced below the threshold. This can result
in unavailability even when a majority of replicas remain live.

Currently, we still need this snapshot size limit because unbounded snapshots
can result in OOM errors that crash entire nodes. However, once snapshots
are streamed from disk to disk, never needing to buffer in-memory on the
sending or receiving side, we should be able to remove any snapshot size
limit (see #16954).

As a holdover, this change introduces a `permitLargeSnapshots` flag on a
replica which is set when the replica is too large to snapshot but observes
splits failing. When set, the flag allows snapshots to ignore the size
limit until the snapshot goes through and splits are able to succeed
again.

Release note (bug fix): Fixed a scenario where a range that is too big
to snapshot can lose availability even with a majority of nodes alive.
  • Loading branch information
nvanbenschoten committed Dec 20, 2017
1 parent 6adf047 commit 4f44c54
Show file tree
Hide file tree
Showing 10 changed files with 251 additions and 12 deletions.
6 changes: 1 addition & 5 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3635,13 +3635,9 @@ func TestInitRaftGroupOnRequest(t *testing.T) {
t.Fatal("replica should not be nil for RHS range")
}

// TODO(spencer): Raft messages seem to turn up
// occasionally on restart, which initialize the replica, so
// this is not a test failure. Not sure how to work around this
// problem.
// Verify the raft group isn't initialized yet.
if repl.IsRaftGroupInitialized() {
log.Errorf(context.TODO(), "expected raft group to be uninitialized")
t.Fatal("expected raft group to be uninitialized")
}

// Send an increment and verify that initializes the Raft group.
Expand Down
170 changes: 167 additions & 3 deletions pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
Expand Down Expand Up @@ -789,7 +790,7 @@ func TestStoreRangeSplitStatsWithMerges(t *testing.T) {
// fillRange writes keys with the given prefix and associated values
// until bytes bytes have been written or the given range has split.
func fillRange(
store *storage.Store, rangeID roachpb.RangeID, prefix roachpb.Key, bytes int64, t *testing.T,
t *testing.T, store *storage.Store, rangeID roachpb.RangeID, prefix roachpb.Key, bytes int64,
) {
src := rand.New(rand.NewSource(0))
for {
Expand All @@ -802,7 +803,7 @@ func fillRange(
return
}
key := append(append([]byte(nil), prefix...), randutil.RandBytes(src, 100)...)
key = keys.MakeFamilyKey(key, 0)
key = keys.MakeFamilyKey(key, src.Uint32())
val := randutil.RandBytes(src, int(src.Int31n(1<<8)))
pArgs := putArgs(key, val)
_, pErr := client.SendWrappedWith(context.Background(), store, roachpb.Header{
Expand Down Expand Up @@ -861,7 +862,7 @@ func TestStoreZoneUpdateAndRangeSplit(t *testing.T) {
}

// Look in the range after prefix we're writing to.
fillRange(store, repl.RangeID, tableBoundary, maxBytes, t)
fillRange(t, store, repl.RangeID, tableBoundary, maxBytes)
}

// Verify that the range is in fact split.
Expand Down Expand Up @@ -912,6 +913,169 @@ func TestStoreRangeSplitWithMaxBytesUpdate(t *testing.T) {
})
}

// TestStoreRangeSplitAfterLargeSnapshot tests a scenario where a range is too
// large to snapshot a follower, but is unable to split because it cannot
// achieve quorum. The leader of the range should adapt to this, eventually
// permitting the large snapshot so that it can recover and then split
// successfully.
func TestStoreRangeSplitAfterLargeSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()

// Set maxBytes to something small so we can exceed the maximum snapshot
// size without adding 2x64MB of data.
const maxBytes = 1 << 16
defer config.TestingSetDefaultZoneConfig(config.ZoneConfig{
RangeMaxBytes: maxBytes,
})()

// Create a three node cluster.
sc := storage.TestStoreConfig(nil)
sc.RaftElectionTimeoutTicks = 1000000
mtc := &multiTestContext{storeConfig: &sc}
defer mtc.Stop()
mtc.Start(t, 3)
store0 := mtc.stores[0]
forAllLiveStores := func(f func(*storage.Store)) {
for _, store := range mtc.stores {
if store != nil {
f(store)
}
}
}

// The behindNode falls behind far enough to require a snapshot.
const behindNode = 1
// The crashingNode crashes after its single range becomes too large to
// snapshot.
const crashingNode = 2

// Wait for initial splits.
t.Log("waiting for initial splits")
forAllLiveStores(func(store *storage.Store) {
store.SetRaftSnapshotQueueActive(true)
store.SetSplitQueueActive(true)
store.ForceSplitScanAndProcess()
})
if err := server.WaitForInitialSplits(store0.DB()); err != nil {
t.Fatal(err)
}

// Then do a write; we'll use this to determine when the dust has settled.
t.Log("performing first write")
keyPrefix := append(keys.UserTableDataMin, []byte("key")...)
repl := store0.LookupReplica(roachpb.RKey(keyPrefix), nil)
rangeID := repl.RangeID
header := roachpb.Header{RangeID: rangeID}
incArgs := incrementArgs(keyPrefix, 1)
if _, pErr := client.SendWrappedWith(context.Background(), store0, header, incArgs); pErr != nil {
t.Fatal(pErr)
}

// Replicate the range we'll play with to the other nodes.
t.Log("replicating range")
mtc.replicateRange(rangeID, behindNode, crashingNode)
mtc.waitForValues(keyPrefix, []int64{1, 1, 1})

// Fill the range without allowing splits so that it will try to split once
// the splitQueue is re-enabled. Fill it past the snapshot size limit
// enforced in Replica.GetSnapshot. We do this before stopping behindNode so
// that the quotaPool does not throttle progress.
t.Log("filling range")
forAllLiveStores(func(store *storage.Store) {
store.SetSplitQueueActive(false)
})
fillRange(t, store0, rangeID, keyPrefix, 2*maxBytes+1)

// Turn off replica scanner and snapshot queue. We'll control queues
// directly from now on.
forAllLiveStores(func(store *storage.Store) {
store.SetReplicaScannerActive(false)
store.SetRaftSnapshotQueueActive(false)
})

// Stop behindNode so it falls behind and will require a snapshot.
t.Log("letting one follower fall behind")
mtc.stopStore(behindNode)

// Let behindNode fall behind.
if _, pErr := client.SendWrappedWith(context.Background(), store0, header, incArgs); pErr != nil {
t.Fatal(pErr)
}
mtc.waitForValues(keyPrefix, []int64{2, 1, 2})

// Truncate the replica's log. This ensures that the only way behindNode can
// recover is through a snapshot.
index, err := repl.GetLastIndex()
if err != nil {
t.Fatal(err)
}
truncArgs := truncateLogArgs(index+1, rangeID)
truncArgs.Key = repl.Desc().StartKey.AsRawKey()
if _, pErr := client.SendWrappedWith(context.Background(), store0, header, truncArgs); pErr != nil {
t.Fatal(pErr)
}

// The range can still make forward progress.
if _, pErr := client.SendWrappedWith(context.Background(), store0, header, incArgs); pErr != nil {
t.Fatal(pErr)
}
mtc.waitForValues(keyPrefix, []int64{3, 1, 3})

// Determine the range count.
prevRangeCount := store0.ReplicaCount()

// Stop crashingNode so that we lose quorum and can no longer split.
// Bring behindNode back up.
t.Log("killing the other follower")
mtc.stopStore(crashingNode)
mtc.restartStore(behindNode)

// Reactivate the split queues and reduce its timeout so it times out due
// to a lack of quorum faster. Force a split, which should fail because it
// cannot achieve quorum. This in turn should set the permitLargeSnapshot
// flag.
t.Log("attempting a split without quorum; this should fail")
forAllLiveStores(func(store *storage.Store) {
store.SetSplitQueueProcessTimeout(1 * time.Second)
store.SetSplitQueueActive(true)
store.ForceSplitScanAndProcess()
})
testutils.SucceedsSoon(t, func() error {
if !repl.PermittingLargeSnapshots() {
return errors.Errorf("replica not permitting large snapshots")
}
return nil
})

// Now that the permitLargeSnapshot flag is set, we should see
// the range recover after behindNode is sent a snapshot.
t.Log("waiting for large snapshot to succeed")
forAllLiveStores(func(store *storage.Store) {
store.SetRaftSnapshotQueueActive(true)
store.ForceRaftSnapshotQueueProcess()
})
mtc.waitForValues(keyPrefix, []int64{3, 3, 3})

// Once the range has a majority of up-to-date nodes, it should be
// able to split. We first increment the manual clock to make sure
// any dangling intents left by previous splits expire.
t.Log("waiting for split to succeed")
mtc.manualClock.Increment(2*base.DefaultHeartbeatInterval.Nanoseconds() + 1)
forAllLiveStores(func(store *storage.Store) {
store.ForceSplitScanAndProcess()
})
testutils.SucceedsSoon(t, func() error {
if store0.ReplicaCount() < prevRangeCount+1 {
return errors.Errorf("expected new range created by split")
}
return nil
})

// Per the contract on multiTestContext.stopStore, we need to restart the
// stopped store before calling multiTestContext.Stop.
mtc.restartStore(crashingNode)
}

// TestStoreRangeSystemSplits verifies that splits are based on the contents of
// the SystemConfig span.
func TestStoreRangeSystemSplits(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,11 @@ func (m *multiTestContext) stopStore(i int) {

m.mu.Lock()
m.stoppers[i] = nil
// Break the transport breaker for this node so that messages sent between a
// store stopping and that store restarting will never remain in-flight in
// the transport and end up reaching the store. This has been the cause of
// flakiness in the past.
m.transport.GetCircuitBreaker(m.idents[i].NodeID).Break()
m.senders[i].RemoveStore(m.stores[i])
m.stores[i] = nil
m.mu.Unlock()
Expand Down Expand Up @@ -905,6 +910,7 @@ func (m *multiTestContext) restartStore(i int) {
m.t.Fatal(err)
}
m.senders[i].AddStore(store)
m.transport.GetCircuitBreaker(m.idents[i].NodeID).Reset()
m.mu.Unlock()
cfg.NodeLiveness.StartHeartbeat(ctx, stopper, func(ctx context.Context) {
now := m.clock.Now()
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ func (s *Store) SetReplicaScannerActive(active bool) {
s.setScannerActive(active)
}

// SetSplitQueueProcessTimeout sets the timeout for processing a replica in the
// split queue.
func (s *Store) SetSplitQueueProcessTimeout(dur time.Duration) {
s.splitQueue.SetProcessTimeout(dur)
}

// GetOrCreateReplica passes through to its lowercase sibling.
func (s *Store) GetOrCreateReplica(
ctx context.Context,
Expand Down Expand Up @@ -347,6 +353,14 @@ func (r *Replica) GetTimestampCacheLowWater() hlc.Timestamp {
return t
}

// PermittingLargeSnapshots returns whether the replica is permitting large
// snapshots.
func (r *Replica) PermittingLargeSnapshots() bool {
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.permitLargeSnapshots
}

// GetRaftLogSize returns the raft log size.
func (r *Replica) GetRaftLogSize() int64 {
r.mu.Lock()
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ func (bq *baseQueue) Disabled() bool {
return bq.mu.disabled
}

// SetProcessTimeout sets the timeout for processing a replica.
func (bq *baseQueue) SetProcessTimeout(dur time.Duration) {
bq.processMu.Lock()
bq.processTimeout = dur
bq.processMu.Unlock()
}

// Start launches a goroutine to process entries in the queue. The
// provided stopper is used to finish processing.
func (bq *baseQueue) Start(clock *hlc.Clock, stopper *stop.Stopper) {
Expand Down
16 changes: 13 additions & 3 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,14 @@ type Replica struct {
minLeaseProposedTS hlc.Timestamp
// Max bytes before split.
maxBytes int64
// Allow snapshots of any size instead of waiting for a split. Set to
// true when a split that is required for snapshots fails. Reset to
// false when the splits eventually succeed. The reasoning here is that
// in certain situations the split is dependent on the snapshot
// succeeding (either directly or transitively), so blocking the
// snapshot on the split can create a deadlock.
// TODO(nvanbenschoten): remove after #16954 is addressed.
permitLargeSnapshots bool
// proposals stores the Raft in-flight commands which
// originated at this Replica, i.e. all commands for which
// propose has been called, but which have not yet
Expand Down Expand Up @@ -1121,19 +1129,21 @@ func (r *Replica) getEstimatedBehindCountRLocked(raftStatus *raft.Status) int64
return 0
}

// GetMaxBytes atomically gets the range maximum byte limit.
// GetMaxBytes gets the range maximum byte limit.
func (r *Replica) GetMaxBytes() int64 {
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.maxBytes
}

// SetMaxBytes atomically sets the maximum byte limit before
// split. This value is cached by the range for efficiency.
// SetMaxBytes sets the maximum byte limit before split.
func (r *Replica) SetMaxBytes(maxBytes int64) {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.maxBytes = maxBytes

// Whenever we change maxBytes, reset permitLargeSnapshots.
r.mu.permitLargeSnapshots = false
}

// IsFirstRange returns true if this is the first range.
Expand Down
14 changes: 13 additions & 1 deletion pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,19 @@ func (r *Replica) GetSnapshot(
defer r.mu.RUnlock()
rangeID := r.RangeID

if r.exceedsDoubleSplitSizeRLocked() {
// TODO(nvanbenschoten): We should never block snapshots indefinitely. Doing
// so can reduce a range's ability to recover from an under-replicated state
// and can cause unavailability even when a majority of replicas remain
// live. For instance, if a range gets too large to snapshot and requires a
// split in order to do so again, the loss of one up-to-date replica could
// cause it to permanently lose quorum.
//
// For now we still need this check because unbounded snapshots can result
// in OOM errors that crash entire nodes. However, once snapshots are
// streamed from disk to disk, never needing to buffer in-memory on the
// sending or receiving side, we should be able to remove any snapshot size
// limit. See #16954 for more.
if r.exceedsDoubleSplitSizeRLocked() && !r.mu.permitLargeSnapshots {
maxBytes := r.mu.maxBytes
size := r.mu.state.Stats.Total()
err := errors.Errorf(
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/replica_raftstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestSkipLargeReplicaSnapshot(t *testing.T) {
t.Fatal(err)
}

// Snapshot should succeed.
if snap, err := rep.GetSnapshot(context.Background(), "test"); err != nil {
t.Fatal(err)
} else {
Expand All @@ -93,6 +94,7 @@ func TestSkipLargeReplicaSnapshot(t *testing.T) {
t.Fatal(err)
}

// Snapshot should fail.
const expected = "not generating test snapshot because replica is too large"
if _, err := rep.GetSnapshot(context.Background(), "test"); !testutils.IsError(err, expected) {
rep.mu.Lock()
Expand All @@ -104,4 +106,16 @@ func TestSkipLargeReplicaSnapshot(t *testing.T) {
rep.needsSplitBySize(), rep.exceedsDoubleSplitSizeRLocked(), err,
)
}

// Set the permitLargeSnapshots flag, which bypasses the snapshot size check.
rep.mu.Lock()
rep.mu.permitLargeSnapshots = true
rep.mu.Unlock()

// Snapshot should succeed.
if snap, err := rep.GetSnapshot(context.Background(), "test"); err != nil {
t.Fatal(err)
} else {
snap.Close()
}
}
11 changes: 11 additions & 0 deletions pkg/storage/split_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,17 @@ func (sq *splitQueue) process(ctx context.Context, r *Replica, sysCfg config.Sys
roachpb.AdminSplitRequest{},
desc,
); pErr != nil {
// If we failed to split the range and the range is too large to snapshot,
// set the permitLargeSnapshots flag so that we don't continue to block
// large snapshots. This could result in unavailability. The flag is reset
// whenever the split size is adjusted, which includes when the split
// finally succeeds.
// TODO(nvanbenschoten): remove after #16954.
r.mu.Lock()
defer r.mu.Unlock()
if r.exceedsDoubleSplitSizeRLocked() {
r.mu.permitLargeSnapshots = true
}
return pErr.GoError()
} else if !validSplitKey {
// If we couldn't find a split key, set the max-bytes for the range to
Expand Down
Loading

0 comments on commit 4f44c54

Please sign in to comment.