Skip to content

Commit

Permalink
Merge pull request #20906 from nvanbenschoten/nvanbenschoten/cherrypi…
Browse files Browse the repository at this point in the history
…ck-20589

cherry-pick-1.1: storage: add permitLargeSnapshots flag to replica
  • Loading branch information
nvanbenschoten authored Dec 20, 2017
2 parents 02bdfdf + 4f44c54 commit 2244045
Show file tree
Hide file tree
Showing 14 changed files with 285 additions and 30 deletions.
6 changes: 1 addition & 5 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3635,13 +3635,9 @@ func TestInitRaftGroupOnRequest(t *testing.T) {
t.Fatal("replica should not be nil for RHS range")
}

// TODO(spencer): Raft messages seem to turn up
// occasionally on restart, which initialize the replica, so
// this is not a test failure. Not sure how to work around this
// problem.
// Verify the raft group isn't initialized yet.
if repl.IsRaftGroupInitialized() {
log.Errorf(context.TODO(), "expected raft group to be uninitialized")
t.Fatal("expected raft group to be uninitialized")
}

// Send an increment and verify that initializes the Raft group.
Expand Down
170 changes: 167 additions & 3 deletions pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
Expand Down Expand Up @@ -789,7 +790,7 @@ func TestStoreRangeSplitStatsWithMerges(t *testing.T) {
// fillRange writes keys with the given prefix and associated values
// until bytes bytes have been written or the given range has split.
func fillRange(
store *storage.Store, rangeID roachpb.RangeID, prefix roachpb.Key, bytes int64, t *testing.T,
t *testing.T, store *storage.Store, rangeID roachpb.RangeID, prefix roachpb.Key, bytes int64,
) {
src := rand.New(rand.NewSource(0))
for {
Expand All @@ -802,7 +803,7 @@ func fillRange(
return
}
key := append(append([]byte(nil), prefix...), randutil.RandBytes(src, 100)...)
key = keys.MakeFamilyKey(key, 0)
key = keys.MakeFamilyKey(key, src.Uint32())
val := randutil.RandBytes(src, int(src.Int31n(1<<8)))
pArgs := putArgs(key, val)
_, pErr := client.SendWrappedWith(context.Background(), store, roachpb.Header{
Expand Down Expand Up @@ -861,7 +862,7 @@ func TestStoreZoneUpdateAndRangeSplit(t *testing.T) {
}

// Look in the range after prefix we're writing to.
fillRange(store, repl.RangeID, tableBoundary, maxBytes, t)
fillRange(t, store, repl.RangeID, tableBoundary, maxBytes)
}

// Verify that the range is in fact split.
Expand Down Expand Up @@ -912,6 +913,169 @@ func TestStoreRangeSplitWithMaxBytesUpdate(t *testing.T) {
})
}

// TestStoreRangeSplitAfterLargeSnapshot tests a scenario where a range is too
// large to snapshot a follower, but is unable to split because it cannot
// achieve quorum. The leader of the range should adapt to this, eventually
// permitting the large snapshot so that it can recover and then split
// successfully.
func TestStoreRangeSplitAfterLargeSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()

// Set maxBytes to something small so we can exceed the maximum snapshot
// size without adding 2x64MB of data.
const maxBytes = 1 << 16
defer config.TestingSetDefaultZoneConfig(config.ZoneConfig{
RangeMaxBytes: maxBytes,
})()

// Create a three node cluster.
sc := storage.TestStoreConfig(nil)
sc.RaftElectionTimeoutTicks = 1000000
mtc := &multiTestContext{storeConfig: &sc}
defer mtc.Stop()
mtc.Start(t, 3)
store0 := mtc.stores[0]
forAllLiveStores := func(f func(*storage.Store)) {
for _, store := range mtc.stores {
if store != nil {
f(store)
}
}
}

// The behindNode falls behind far enough to require a snapshot.
const behindNode = 1
// The crashingNode crashes after its single range becomes too large to
// snapshot.
const crashingNode = 2

// Wait for initial splits.
t.Log("waiting for initial splits")
forAllLiveStores(func(store *storage.Store) {
store.SetRaftSnapshotQueueActive(true)
store.SetSplitQueueActive(true)
store.ForceSplitScanAndProcess()
})
if err := server.WaitForInitialSplits(store0.DB()); err != nil {
t.Fatal(err)
}

// Then do a write; we'll use this to determine when the dust has settled.
t.Log("performing first write")
keyPrefix := append(keys.UserTableDataMin, []byte("key")...)
repl := store0.LookupReplica(roachpb.RKey(keyPrefix), nil)
rangeID := repl.RangeID
header := roachpb.Header{RangeID: rangeID}
incArgs := incrementArgs(keyPrefix, 1)
if _, pErr := client.SendWrappedWith(context.Background(), store0, header, incArgs); pErr != nil {
t.Fatal(pErr)
}

// Replicate the range we'll play with to the other nodes.
t.Log("replicating range")
mtc.replicateRange(rangeID, behindNode, crashingNode)
mtc.waitForValues(keyPrefix, []int64{1, 1, 1})

// Fill the range without allowing splits so that it will try to split once
// the splitQueue is re-enabled. Fill it past the snapshot size limit
// enforced in Replica.GetSnapshot. We do this before stopping behindNode so
// that the quotaPool does not throttle progress.
t.Log("filling range")
forAllLiveStores(func(store *storage.Store) {
store.SetSplitQueueActive(false)
})
fillRange(t, store0, rangeID, keyPrefix, 2*maxBytes+1)

// Turn off replica scanner and snapshot queue. We'll control queues
// directly from now on.
forAllLiveStores(func(store *storage.Store) {
store.SetReplicaScannerActive(false)
store.SetRaftSnapshotQueueActive(false)
})

// Stop behindNode so it falls behind and will require a snapshot.
t.Log("letting one follower fall behind")
mtc.stopStore(behindNode)

// Let behindNode fall behind.
if _, pErr := client.SendWrappedWith(context.Background(), store0, header, incArgs); pErr != nil {
t.Fatal(pErr)
}
mtc.waitForValues(keyPrefix, []int64{2, 1, 2})

// Truncate the replica's log. This ensures that the only way behindNode can
// recover is through a snapshot.
index, err := repl.GetLastIndex()
if err != nil {
t.Fatal(err)
}
truncArgs := truncateLogArgs(index+1, rangeID)
truncArgs.Key = repl.Desc().StartKey.AsRawKey()
if _, pErr := client.SendWrappedWith(context.Background(), store0, header, truncArgs); pErr != nil {
t.Fatal(pErr)
}

// The range can still make forward progress.
if _, pErr := client.SendWrappedWith(context.Background(), store0, header, incArgs); pErr != nil {
t.Fatal(pErr)
}
mtc.waitForValues(keyPrefix, []int64{3, 1, 3})

// Determine the range count.
prevRangeCount := store0.ReplicaCount()

// Stop crashingNode so that we lose quorum and can no longer split.
// Bring behindNode back up.
t.Log("killing the other follower")
mtc.stopStore(crashingNode)
mtc.restartStore(behindNode)

// Reactivate the split queues and reduce its timeout so it times out due
// to a lack of quorum faster. Force a split, which should fail because it
// cannot achieve quorum. This in turn should set the permitLargeSnapshot
// flag.
t.Log("attempting a split without quorum; this should fail")
forAllLiveStores(func(store *storage.Store) {
store.SetSplitQueueProcessTimeout(1 * time.Second)
store.SetSplitQueueActive(true)
store.ForceSplitScanAndProcess()
})
testutils.SucceedsSoon(t, func() error {
if !repl.PermittingLargeSnapshots() {
return errors.Errorf("replica not permitting large snapshots")
}
return nil
})

// Now that the permitLargeSnapshot flag is set, we should see
// the range recover after behindNode is sent a snapshot.
t.Log("waiting for large snapshot to succeed")
forAllLiveStores(func(store *storage.Store) {
store.SetRaftSnapshotQueueActive(true)
store.ForceRaftSnapshotQueueProcess()
})
mtc.waitForValues(keyPrefix, []int64{3, 3, 3})

// Once the range has a majority of up-to-date nodes, it should be
// able to split. We first increment the manual clock to make sure
// any dangling intents left by previous splits expire.
t.Log("waiting for split to succeed")
mtc.manualClock.Increment(2*base.DefaultHeartbeatInterval.Nanoseconds() + 1)
forAllLiveStores(func(store *storage.Store) {
store.ForceSplitScanAndProcess()
})
testutils.SucceedsSoon(t, func() error {
if store0.ReplicaCount() < prevRangeCount+1 {
return errors.Errorf("expected new range created by split")
}
return nil
})

// Per the contract on multiTestContext.stopStore, we need to restart the
// stopped store before calling multiTestContext.Stop.
mtc.restartStore(crashingNode)
}

// TestStoreRangeSystemSplits verifies that splits are based on the contents of
// the SystemConfig span.
func TestStoreRangeSystemSplits(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,11 @@ func (m *multiTestContext) stopStore(i int) {

m.mu.Lock()
m.stoppers[i] = nil
// Break the transport breaker for this node so that messages sent between a
// store stopping and that store restarting will never remain in-flight in
// the transport and end up reaching the store. This has been the cause of
// flakiness in the past.
m.transport.GetCircuitBreaker(m.idents[i].NodeID).Break()
m.senders[i].RemoveStore(m.stores[i])
m.stores[i] = nil
m.mu.Unlock()
Expand Down Expand Up @@ -905,6 +910,7 @@ func (m *multiTestContext) restartStore(i int) {
m.t.Fatal(err)
}
m.senders[i].AddStore(store)
m.transport.GetCircuitBreaker(m.idents[i].NodeID).Reset()
m.mu.Unlock()
cfg.NodeLiveness.StartHeartbeat(ctx, stopper, func(ctx context.Context) {
now := m.clock.Now()
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ func (s *Store) SetReplicaScannerActive(active bool) {
s.setScannerActive(active)
}

// SetSplitQueueProcessTimeout sets the timeout for processing a replica in the
// split queue.
func (s *Store) SetSplitQueueProcessTimeout(dur time.Duration) {
s.splitQueue.SetProcessTimeout(dur)
}

// GetOrCreateReplica passes through to its lowercase sibling.
func (s *Store) GetOrCreateReplica(
ctx context.Context,
Expand Down Expand Up @@ -347,6 +353,14 @@ func (r *Replica) GetTimestampCacheLowWater() hlc.Timestamp {
return t
}

// PermittingLargeSnapshots returns whether the replica is permitting large
// snapshots.
func (r *Replica) PermittingLargeSnapshots() bool {
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.permitLargeSnapshots
}

// GetRaftLogSize returns the raft log size.
func (r *Replica) GetRaftLogSize() int64 {
r.mu.Lock()
Expand Down
16 changes: 10 additions & 6 deletions pkg/storage/id_alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,19 @@ func newIDAllocator(
}

// Allocate allocates a new ID from the global KV DB.
func (ia *idAllocator) Allocate() (uint32, error) {
func (ia *idAllocator) Allocate(ctx context.Context) (uint32, error) {
ia.once.Do(ia.start)

id := <-ia.ids
// when the channel is closed, the zero value is returned.
if id == 0 {
return id, errors.Errorf("could not allocate ID; system is draining")
select {
case id := <-ia.ids:
// when the channel is closed, the zero value is returned.
if id == 0 {
return id, errors.Errorf("could not allocate ID; system is draining")
}
return id, nil
case <-ctx.Done():
return 0, ctx.Err()
}
return id, nil
}

func (ia *idAllocator) start() {
Expand Down
24 changes: 17 additions & 7 deletions pkg/storage/id_alloc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestIDAllocator(t *testing.T) {
for i := 0; i < maxI; i++ {
go func() {
for j := 0; j < maxJ; j++ {
id, err := idAlloc.Allocate()
id, err := idAlloc.Allocate(context.Background())
errChan <- err
allocd <- id
}
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestIDAllocatorNegativeValue(t *testing.T) {
if err != nil {
t.Errorf("failed to create IDAllocator: %v", err)
}
value, err := idAlloc.Allocate()
value, err := idAlloc.Allocate(context.Background())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestAllocateErrorAndRecovery(t *testing.T) {
t.Errorf("failed to create IDAllocator: %v", err)
}

firstID, err := idAlloc.Allocate()
firstID, err := idAlloc.Allocate(context.Background())
if err != nil {
t.Fatal(err)
}
Expand All @@ -172,7 +172,7 @@ func TestAllocateErrorAndRecovery(t *testing.T) {
// Should be able to get the allocated IDs, and there will be one
// background allocateBlock to get ID continuously.
for i := 0; i < 8; i++ {
id, err := idAlloc.Allocate()
id, err := idAlloc.Allocate(context.Background())
if err != nil {
t.Fatal(err)
}
Expand All @@ -194,7 +194,7 @@ func TestAllocateErrorAndRecovery(t *testing.T) {
errChan <- nil
}

id, err := idAlloc.Allocate()
id, err := idAlloc.Allocate(context.Background())
errChan <- err
allocd <- id
}()
Expand All @@ -207,6 +207,16 @@ func TestAllocateErrorAndRecovery(t *testing.T) {
}
}

// Attempt a few allocations with a context timeout while allocations are
// blocked. All attempts should hit a context deadline exceeded error.
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
for i := 0; i < routines; i++ {
id, err := idAlloc.Allocate(ctx)
if id != 0 || err != context.DeadlineExceeded {
t.Errorf("expected context cancellation, found id=%d, err=%v", id, err)
}
}

// Make the IDAllocator valid again.
idAlloc.idKey.Store(keys.RangeIDGenerator)
// Check if the blocked allocations return expected ID.
Expand All @@ -226,7 +236,7 @@ func TestAllocateErrorAndRecovery(t *testing.T) {

// Check if the following allocations return expected ID.
for i := 0; i < routines; i++ {
id, err := idAlloc.Allocate()
id, err := idAlloc.Allocate(context.Background())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -254,7 +264,7 @@ func TestAllocateWithStopper(t *testing.T) {
return idAlloc
}()

if _, err := idAlloc.Allocate(); !testutils.IsError(err, "system is draining") {
if _, err := idAlloc.Allocate(context.Background()); !testutils.IsError(err, "system is draining") {
t.Errorf("unexpected error: %v", err)
}
}
7 changes: 7 additions & 0 deletions pkg/storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ func (bq *baseQueue) Disabled() bool {
return bq.mu.disabled
}

// SetProcessTimeout sets the timeout for processing a replica.
func (bq *baseQueue) SetProcessTimeout(dur time.Duration) {
bq.processMu.Lock()
bq.processTimeout = dur
bq.processMu.Unlock()
}

// Start launches a goroutine to process entries in the queue. The
// provided stopper is used to finish processing.
func (bq *baseQueue) Start(clock *hlc.Clock, stopper *stop.Stopper) {
Expand Down
Loading

0 comments on commit 2244045

Please sign in to comment.