From f620218eea70c4f67d50452599f3a28111b66b2d Mon Sep 17 00:00:00 2001 From: Joey Pereira Date: Tue, 10 Oct 2017 00:36:17 -0400 Subject: [PATCH] sql: Refactor TableDescriptor lease acquisition to use singleflight This changes the acquisition of leases to use singleflight to reduce the complexity of handling concurrent acquisitions. The behaviour is kept the same as we block on the results of singleflight.DoChan. The change is in preparation for adding a new codepath which acquires leases asynchronously. This also refactors out the jitter multiplier into a constant. --- pkg/sql/lease.go | 202 +++++++++++++++++++----------- pkg/sql/lease_internal_test.go | 216 ++++++++++++++++++++++++++++++++- pkg/sql/lease_test.go | 4 +- 3 files changed, 351 insertions(+), 71 deletions(-) diff --git a/pkg/sql/lease.go b/pkg/sql/lease.go index 25924c2ccfe6..cf9eb1101e90 100644 --- a/pkg/sql/lease.go +++ b/pkg/sql/lease.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) @@ -46,9 +47,16 @@ import ( const ( // LeaseDuration is the mean duration a lease will be acquired for. The - // actual duration is jittered in the range - // [0.75,1.25]*LeaseDuration. Exported for testing purposes only. + // actual duration is jittered using LeaseJitterFraction. Jittering is done to + // prevent multiple leases from being renewed simultaneously if they were all + // acquired simultaneously. Exported for testing purposes only. LeaseDuration = 5 * time.Minute + // LeaseJitterFraction is the factor that we use to randomly jitter the lease + // duration when acquiring a new lease and the lease renewal timeout. The + // range of the actual lease duration will be + // [(1-LeaseJitterFraction) * LeaseDuration, (1+LeaseJitterFraction) * LeaseDuration] + // Exported for testing purposes only. + LeaseJitterFraction = 0.25 ) // tableVersionState holds the state for a table version. This includes @@ -118,18 +126,21 @@ type LeaseStore struct { clock *hlc.Clock nodeID *base.NodeIDContainer - // leaseDuration is a constant initialized by NewLeaseManager. It is modified - // only during by tests through testSetLeaseDuration. - leaseDuration time.Duration + // leaseDuration and leasejitterFraction are a constants initialized by + // NewLeaseManager. They are modified only during by tests through + // testSetLeaseDuration and testSetLeaseJitterFraction. + leaseDuration time.Duration + leaseJitterFraction float64 testingKnobs LeaseStoreTestingKnobs memMetrics *MemoryMetrics } // jitteredLeaseDuration returns a randomly jittered duration from the interval -// [0.75 * leaseDuration, 1.25 * leaseDuration]. -func jitteredLeaseDuration(leaseDuration time.Duration) time.Duration { - return time.Duration(float64(leaseDuration) * (0.75 + 0.5*rand.Float64())) +// [(1-LeaseJitterFraction) * leaseDuration, (1+LeaseJitterFraction) * leaseDuration]. +func jitteredLeaseDuration(leaseDuration time.Duration, leaseJitterFraction float64) time.Duration { + return time.Duration(float64(leaseDuration) * (1 - leaseJitterFraction + + 2*leaseJitterFraction*rand.Float64())) } // acquire a lease on the most recent version of a table descriptor. @@ -141,7 +152,7 @@ func (s LeaseStore) acquire( var table *tableVersionState err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { expiration := txn.OrigTimestamp() - expiration.WallTime += int64(jitteredLeaseDuration(s.leaseDuration)) + expiration.WallTime += int64(jitteredLeaseDuration(s.leaseDuration, s.leaseJitterFraction)) if expiration.Less(minExpirationTime) { expiration = minExpirationTime } @@ -445,6 +456,11 @@ func (s LeaseStore) getForExpiration( return table, err } +// leaseToken is an opaque token representing a lease. It's distinct from a +// lease to define restricted capabilities and prevent improper use of a lease +// where we instead have leaseTokens. +type leaseToken *tableVersionState + // tableSet maintains an ordered set of tableVersionState objects sorted // by version. It supports addition and removal of elements, finding the // table for a particular version, or finding the most recent table version. @@ -470,6 +486,15 @@ func (l *tableSet) String() string { return buf.String() } +// isNewest checks if the leaseToken represents the newest lease in the +// tableSet. +func (l *tableSet) isNewest(t leaseToken) bool { + if len(l.data) == 0 { + return false + } + return leaseToken(l.data[len(l.data)-1]) == t +} + func (l *tableSet) insert(s *tableVersionState) { i, match := l.findIndex(s.Version) if match { @@ -540,6 +565,8 @@ func (l *tableSet) findVersion(version sqlbase.DescriptorVersion) *tableVersionS return nil } +const acquireGroupKey = "acquire" + type tableState struct { id sqlbase.ID // The cache is updated every time we acquire or release a table. @@ -549,6 +576,9 @@ type tableState struct { mu struct { syncutil.Mutex + // group is used for all calls made to acquireNodeLease to prevent + // concurrent lease acquisitions from the store. + group singleflight.Group // table descriptors sorted by increasing version. This set always // contains a table descriptor version with a lease as the latest // entry. There may be more than one active lease when the system is @@ -558,11 +588,6 @@ type tableState struct { // entry is created with the expiration time of the new lease and // the older entry is removed. active tableSet - // A channel used to indicate whether a lease is actively being - // acquired. nil if there is no lease acquisition in progress for - // the table. If non-nil, the channel will be closed when lease - // acquisition completes. - acquiring chan struct{} // Indicates that the table has been dropped, or is being dropped. // If set, leases are released from the store as soon as their // refcount drops to 0, as opposed to waiting until they expire. @@ -578,17 +603,26 @@ func (t *tableState) acquire( ) (*tableVersionState, error) { t.mu.Lock() defer t.mu.Unlock() - // Wait for any existing lease acquisition. - t.acquireWait() - // Acquire a lease if no lease exists or if the latest lease is - // about to expire. - if s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp) { - if err := t.acquireNodeLease(ctx, m, hlc.Timestamp{}); err != nil { - return nil, err + // Acquire a lease if no lease exists or if the latest lease is about to + // expire. Looping is necessary because lease acquisition is done without + // holding the tableState lock, so anything can happen in between lease + // acquisition and us getting control again. + for s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp); s = t.mu.active.findNewest() { + var resultChan <-chan singleflight.Result + resultChan, _ = t.mu.group.DoChan(acquireGroupKey, func() (interface{}, error) { + return t.acquireNodeLease(ctx, m, hlc.Timestamp{}) + }) + t.mu.Unlock() + if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil { + m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent() + } + result := <-resultChan + t.mu.Lock() + if result.Err != nil { + return nil, result.Err } } - return t.findForTimestamp(ctx, timestamp, m) } @@ -701,20 +735,59 @@ func (t *tableState) findForTimestamp( // // t.mu must be locked. func (t *tableState) acquireFreshestFromStoreLocked(ctx context.Context, m *LeaseManager) error { - // Ensure there is no lease acquisition in progress. - t.acquireWait() - - // Move forward to acquire a fresh table lease. + // We need to acquire a lease on a "fresh" descriptor, meaning that joining + // a potential in-progress lease acquisition is generally not good enough. + // If we are to join an in-progress acquisition, it needs to be an acquisition + // initiated after this point. + // So, we handle two cases: + // 1. The first DoChan() call tells us that we didn't join an in-progress + // acquisition. Great, the lease that's being acquired is good. + // 2. The first DoChan() call tells us that we did join an in-progress acq. + // We have to wait this acquisition out; it's not good for us. But any + // future acquisition is good, so the next time around the loop it doesn't + // matter if we initiate a request or join an in-progress one. + // In both cases, we need to check if the lease we want is still valid because + // lease acquisition is done without holding the tableState lock, so anything + // can happen in between lease acquisition and us getting control again. + attemptsMade := 0 + for { + // Move forward the expiry to acquire a fresh table lease. + + // Set the min expiration time to guarantee that the lease acquired is the + // last lease in t.mu.active. + // TODO(vivek): the expiration time is no longer needed to sort the + // tableVersionState. Get rid of this. + minExpirationTime := hlc.Timestamp{} + s := t.mu.active.findNewest() + if s != nil { + minExpirationTime = s.expiration.Add(int64(time.Millisecond), 0) + } - // Set the min expiration time to guarantee that the lease acquired is the - // last lease in t.mu.active . - minExpirationTime := hlc.Timestamp{} - newestTable := t.mu.active.findNewest() - if newestTable != nil { - minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0) + resultChan, wasCalled := t.mu.group.DoChan(acquireGroupKey, func() (interface{}, error) { + return t.acquireNodeLease(ctx, m, minExpirationTime) + }) + t.mu.Unlock() + if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireFreshestResultBlockEvent != nil { + m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireFreshestResultBlockEvent() + } + result := <-resultChan + t.mu.Lock() + if result.Err != nil { + return result.Err + } + l := result.Val.(leaseToken) + if wasCalled && t.mu.active.isNewest(l) { + // Case 1: we didn't join an in-progress call and the lease is still + // valid. + break + } else if attemptsMade > 1 && t.mu.active.isNewest(l) { + // Case 2: more than one acquisition has happened and the lease is still + // valid. + break + } + attemptsMade++ } - - return t.acquireNodeLease(ctx, m, minExpirationTime) + return nil } // upsertLocked inserts a lease for a particular table version. @@ -761,18 +834,6 @@ func (t *tableState) removeInactiveVersions(m *LeaseManager) { } } -// acquireWait waits until no lease acquisition is in progress. -func (t *tableState) acquireWait() { - // Spin until no lease acquisition is in progress. - for acquiring := t.mu.acquiring; acquiring != nil; acquiring = t.mu.acquiring { - // We're called with mu locked, but need to unlock it while we wait - // for the in-progress lease acquisition to finish. - t.mu.Unlock() - <-acquiring - t.mu.Lock() - } -} - // If the lease cannot be obtained because the descriptor is in the process of // being dropped, the error will be errTableDropped. // minExpirationTime, if not set to the zero value, will be used as a lower @@ -780,32 +841,21 @@ func (t *tableState) acquireWait() { // jitter in the expiration time, and guarantee that we get a lease that will be // inserted at the end of the lease set (i.e. it will be returned by // findNewest() from now on). -// -// t.mu needs to be locked. func (t *tableState) acquireNodeLease( ctx context.Context, m *LeaseManager, minExpirationTime hlc.Timestamp, -) error { +) (leaseToken, error) { if m.isDraining() { - return errors.New("cannot acquire lease when draining") + return nil, errors.New("cannot acquire lease when draining") } - - // Notify when lease has been acquired. - t.mu.acquiring = make(chan struct{}) - defer func() { - close(t.mu.acquiring) - t.mu.acquiring = nil - }() - // We're called with mu locked, but need to unlock it during lease - // acquisition. - t.mu.Unlock() table, err := m.LeaseStore.acquire(ctx, t.id, minExpirationTime) - t.mu.Lock() if err != nil { - return err + return nil, err } + t.mu.Lock() + defer t.mu.Unlock() t.upsertLocked(ctx, table, m) t.tableNameCache.insert(table) - return nil + return leaseToken(table), nil } func (t *tableState) release(table *sqlbase.TableDescriptor, m *LeaseManager) error { @@ -936,6 +986,12 @@ type LeaseStoreTestingKnobs struct { LeaseReleasedEvent func(table sqlbase.TableDescriptor, err error) // Called after a lease is acquired, with any operation error. LeaseAcquiredEvent func(table sqlbase.TableDescriptor, err error) + // Called before waiting on a results from a DoChan call of acquireNodeLease + // in tableState.acquire(), with any operation error. + LeaseAcquireResultBlockEvent func() + // Called before waiting on a results from a DoChan call of acquireNodeLease + // in tableState.acquireFreshestFromStoreLocked(), with any operation error. + LeaseAcquireFreshestResultBlockEvent func() // RemoveOnceDereferenced forces leases to be removed // as soon as they are dereferenced. RemoveOnceDereferenced bool @@ -1103,12 +1159,13 @@ func NewLeaseManager( ) *LeaseManager { lm := &LeaseManager{ LeaseStore: LeaseStore{ - db: db, - clock: clock, - nodeID: nodeID, - leaseDuration: LeaseDuration, - testingKnobs: testingKnobs.LeaseStoreTestingKnobs, - memMetrics: memMetrics, + db: db, + clock: clock, + nodeID: nodeID, + leaseDuration: LeaseDuration, + leaseJitterFraction: LeaseJitterFraction, + testingKnobs: testingKnobs.LeaseStoreTestingKnobs, + memMetrics: memMetrics, }, testingKnobs: testingKnobs, tableNames: tableNameCache{ @@ -1136,6 +1193,13 @@ func (m *LeaseManager) TestSetLeaseDuration(leaseDuration time.Duration) { m.LeaseStore.leaseDuration = leaseDuration } +// TestSetLeaseJitterFraction is used to modify the leaseJitterFraction constant +// for the lifespan of the LeaseManager. This function exists and is exported only +// for testing purposes. +func (m *LeaseManager) TestSetLeaseJitterFraction(leaseJitterFraction float64) { + m.LeaseStore.leaseJitterFraction = leaseJitterFraction +} + // AcquireByName returns a table version for the specified table valid for // the timestamp. It returns the table descriptor and a expiration time. // A transaction using this descriptor must ensure that its diff --git a/pkg/sql/lease_internal_test.go b/pkg/sql/lease_internal_test.go index 7229072cbc76..13eaa93a451d 100644 --- a/pkg/sql/lease_internal_test.go +++ b/pkg/sql/lease_internal_test.go @@ -19,16 +19,19 @@ package sql import ( "fmt" "sync" + "sync/atomic" "testing" "golang.org/x/net/context" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" ) func TestTableSet(t *testing.T) { @@ -411,7 +414,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); // Try to trigger the race repeatedly: race an AcquireByName against a // Release. - // tableChan acts as a barrier, synchornizing the two routines at every + // tableChan acts as a barrier, synchronizing the two routines at every // iteration. tableChan := make(chan *sqlbase.TableDescriptor) errChan := make(chan error) @@ -558,3 +561,214 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); wg.Wait() } + +// Test one possible outcome of a race between a lease acquisition (the first +// case through tableState.acquire(), the second through +// tableState.acquireFreshestFromStore()) and a release of the lease that was +// just acquired. Precisely: +// 1. Thread 1 calls either acquireFreshestFromStore() or acquire(). +// 2. Thread 1 releases the lock on tableState and starts acquisition of a lease +// from the store, blocking until it's finished. +// 3. Thread 2 calls acquire(). The lease has not been acquired yet, so it +// also enters the acquisition code path (calling DoChan). +// 4. Thread 2 proceeds to release the lock on tableState waiting for the +// in-flight acquisition. +// 4. The lease is acquired from the store and the waiting routines are +// unblocked. +// 5. Thread 2 unblocks first, and releases the new lease, for whatever reason. +// 5. Thread 1 wakes up. At this point, a naive implementation would use the +// newly acquired lease, which would be incorrect. The test checks that +// acquireFreshestFromStore() or acquire() notices, after re-acquiring the +// tableState lock, that the new lease has been released and acquires a new +// one. +func TestLeaseAcquireAndReleaseConcurrenctly(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Result is a struct for moving results to the main result routine. + type Result struct { + table *sqlbase.TableDescriptor + exp hlc.Timestamp + err error + } + + descID := sqlbase.ID(keys.LeaseTableID) + + // acquireBlock calls Acquire and then releases the lease. + acquireBlock := func( + ctx context.Context, + m *LeaseManager, + acquireChan chan Result, + ) { + table, e, err := m.Acquire(ctx, m.clock.Now(), descID) + acquireChan <- Result{err: err, exp: e, table: table} + } + + // acquireFreshestBlock calls acquireFreshestFromStore and then releases the + // lease. + acquireFreshestBlock := func( + ctx context.Context, + m *LeaseManager, + acquireChan chan Result, + ) { + table, e, err := m.acquireFreshestFromStore(ctx, descID) + acquireChan <- Result{err: err, exp: e, table: table} + } + + testCases := []struct { + // The name of the test + name string + // The routine being called during the test as the "thread 1" mentioned in + // the comment preceding this test. + routineFunc func(ctx context.Context, m *LeaseManager, acquireChan chan Result) + // Whether the second routine is a call to LeaseManager.acquireFreshest or + // not. This determines which channel we unblock. + isSecondCallAcquireFreshest bool + }{ + // This test case checks if the race condition occurs where thread 1 calls + // tableState.acquire(). + { + name: "CallAcquireConcurrently", + routineFunc: acquireBlock, + isSecondCallAcquireFreshest: false, + }, + // This test case checks if the race condition occurs where thread 1 calls + // tableState.acquireFreshestFromStore(). + { + name: "CallAcquireFreshestAndAcquireConcurrently", + routineFunc: acquireFreshestBlock, + isSecondCallAcquireFreshest: true, + }, + } + + for _, test := range testCases { + ctx := context.Background() + t.Run(test.name, func(t *testing.T) { + // blockChan and freshestBlockChan is used to set up the race condition. + blockChan := make(chan struct{}) + freshestBlockChan := make(chan struct{}) + // acquisitionBlock is used to prevent acquireNodeLease from + // completing, to force a lease to delay its acquisition. + acquisitionBlock := make(chan struct{}) + + // preblock is used for the main routine to wait for all acquisition + // routines to catch up. + var preblock sync.WaitGroup + // acquireArrivals and acquireFreshestArrivals tracks how many times + // we've arrived at the knob codepath for the corresponding functions. + // This is needed because the fix to the race condition hits the knob more + // than once in a single routine, so we need to ignore any extra passes. + var acquireArrivals int32 + var acquireFreshestArrivals int32 + // leasesAcquiredCount counts how many leases were acquired in total. + var leasesAcquiredCount int32 + + removalTracker := NewLeaseRemovalTracker() + testingKnobs := base.TestingKnobs{ + SQLLeaseManager: &LeaseManagerTestingKnobs{ + LeaseStoreTestingKnobs: LeaseStoreTestingKnobs{ + RemoveOnceDereferenced: true, + LeaseReleasedEvent: removalTracker.LeaseRemovedNotification, + LeaseAcquireFreshestResultBlockEvent: func() { + log.Warningf(ctx, "AcquireFreshestPrelockEvent") + if atomic.LoadInt32(&acquireFreshestArrivals) < 1 { + log.Warningf(ctx, "AcquireFreshestPrelockEvent first arrival: blocking") + atomic.AddInt32(&acquireFreshestArrivals, 1) + preblock.Done() + <-freshestBlockChan + log.Warningf(ctx, "AcquireFreshestPrelockEvent first arrival: unblocking") + } + }, + LeaseAcquireResultBlockEvent: func() { + if count := atomic.LoadInt32(&acquireArrivals); (count < 1 && test.isSecondCallAcquireFreshest) || + (count < 2 && !test.isSecondCallAcquireFreshest) { + atomic.AddInt32(&acquireArrivals, 1) + preblock.Done() + <-blockChan + } + }, + LeaseAcquiredEvent: func(_ sqlbase.TableDescriptor, _ error) { + atomic.AddInt32(&leasesAcquiredCount, 1) + <-acquisitionBlock + }, + }, + }, + } + + s, _, _ := serverutils.StartServer( + t, base.TestServerArgs{Knobs: testingKnobs}) + defer s.Stopper().Stop(context.TODO()) + leaseManager := s.LeaseManager().(*LeaseManager) + + // Set the lease jitter so leases will have monotonically increasing + // expiration. This prevents a false test failure if two leases happen to + // have the same expiration due to randomness as expiration is used to + // check if two leases are equal. + leaseManager.TestSetLeaseJitterFraction(0) + + acquireResultChan := make(chan Result) + + // Start two routines to acquire and release. + preblock.Add(2) + go acquireBlock(ctx, leaseManager, acquireResultChan) + go test.routineFunc(ctx, leaseManager, acquireResultChan) + + // Wait until both routines arrive + preblock.Wait() + + // Allow the acquisition to finish. By delaying it until now, we guarantee + // both routines will receive the same lease. + acquisitionBlock <- struct{}{} + + // Allow the first routine to finish acquisition. In the case where both + // routines are calling Acquire(), first refers to whichever routine + // continues, order does not matter. + blockChan <- struct{}{} + // Wait for the first routines results + result1 := <-acquireResultChan + if result1.err != nil { + t.Fatal(result1.err) + } + + // Release and remove the lease + tracker := removalTracker.TrackRemoval(result1.table) + if err := leaseManager.Release(result1.table); err != nil { + t.Fatal(err) + } + // Wait until the lease is fully removed. + if err := tracker.WaitForRemoval(); err != nil { + t.Fatal(err) + } + + // Allow the second routine to proceed. + if test.isSecondCallAcquireFreshest { + freshestBlockChan <- struct{}{} + } else { + blockChan <- struct{}{} + } + + // Allow all future acquisitions to complete. + close(acquisitionBlock) + + // Get the acquisition results of the second routine. + result2 := <-acquireResultChan + if result2.err != nil { + t.Fatal(result2.err) + } + // Release and remove the lease + tracker = removalTracker.TrackRemoval(result2.table) + if err := leaseManager.Release(result2.table); err != nil { + t.Fatal(err) + } + // Wait until the lease is fully removed. + if err := tracker.WaitForRemoval(); err != nil { + t.Fatal(err) + } + + if result1.table == result2.table && result1.exp == result2.exp { + t.Fatalf("Expected the leases to be different. TableDescriptor pointers are equal and both the same expiration") + } else if count := atomic.LoadInt32(&leasesAcquiredCount); count != 2 { + t.Fatalf("Expected to acquire 2 leases, instead got %d", count) + } + }) + } +} diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index f8281169b731..ee52c5c42114 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -230,8 +230,10 @@ func TestLeaseManager(testingT *testing.T) { // table and expiration. l1, e1 := t.mustAcquire(1, descID) l2, e2 := t.mustAcquire(1, descID) - if l1.ID != l2.ID || e1 != e2 { + if l1.ID != l2.ID { t.Fatalf("expected same lease, but found %v != %v", l1, l2) + } else if e1 != e2 { + t.Fatalf("expected same lease timestamps, but found %v != %v", e1, e2) } t.expectLeases(descID, "/1/1") // Node 2 never acquired a lease on descID, so we should expect an error.