From f52f0b23822ec1b138dc05ab32c655c0a19c7c7d Mon Sep 17 00:00:00 2001 From: Joey Pereira Date: Mon, 16 Oct 2017 17:01:48 -0400 Subject: [PATCH] sql: Refactor TableDescriptor lease acquisition to use singleflight This changes the acquisition of leases to use singleflight to reduce the complexity of handling concurrent acquisitions. The behaviour is kept the same as we block on the results of singleflight.DoChan. The change is in preparation for adding a new codepath which acquires leases asynchronously. --- pkg/sql/lease.go | 164 ++++++++++++++++--------- pkg/sql/lease_internal_test.go | 214 ++++++++++++++++++++++++++++++++- pkg/sql/lease_test.go | 4 +- 3 files changed, 326 insertions(+), 56 deletions(-) diff --git a/pkg/sql/lease.go b/pkg/sql/lease.go index 29873bf14f1c..1e53c83d494c 100644 --- a/pkg/sql/lease.go +++ b/pkg/sql/lease.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) @@ -447,6 +448,11 @@ func (s LeaseStore) getForExpiration( return table, err } +// leaseToken is an opaque token representing a lease. It's distinct from a +// lease to define restricted capabilities and prevent improper use of a lease +// where we instead have leaseTokens. +type leaseToken *tableVersionState + // tableSet maintains an ordered set of tableVersionState objects sorted // by version. It supports addition and removal of elements, finding the // table for a particular version, or finding the most recent table version. @@ -472,6 +478,15 @@ func (l *tableSet) String() string { return buf.String() } +// isNewest checks if the leaseToken represents the newest lease in the +// tableSet. +func (l *tableSet) isNewest(t leaseToken) bool { + if len(l.data) == 0 { + return false + } + return leaseToken(l.data[len(l.data)-1]) == t +} + func (l *tableSet) insert(s *tableVersionState) { i, match := l.findIndex(s.Version) if match { @@ -542,6 +557,8 @@ func (l *tableSet) findVersion(version sqlbase.DescriptorVersion) *tableVersionS return nil } +const acquireGroupKey = "acquire" + type tableState struct { id sqlbase.ID // The cache is updated every time we acquire or release a table. @@ -551,6 +568,9 @@ type tableState struct { mu struct { syncutil.Mutex + // group is used for all calls made to acquireNodeLease to prevent + // concurrent lease acquisitions from the store. + group singleflight.Group // table descriptors sorted by increasing version. This set always // contains a table descriptor version with a lease as the latest // entry. There may be more than one active lease when the system is @@ -560,11 +580,6 @@ type tableState struct { // entry is created with the expiration time of the new lease and // the older entry is removed. active tableSet - // A channel used to indicate whether a lease is actively being - // acquired. nil if there is no lease acquisition in progress for - // the table. If non-nil, the channel will be closed when lease - // acquisition completes. - acquiring chan struct{} // Indicates that the table has been dropped, or is being dropped. // If set, leases are released from the store as soon as their // refcount drops to 0, as opposed to waiting until they expire. @@ -580,17 +595,26 @@ func (t *tableState) acquire( ) (*tableVersionState, error) { t.mu.Lock() defer t.mu.Unlock() - // Wait for any existing lease acquisition. - t.acquireWait() - // Acquire a lease if no lease exists or if the latest lease is - // about to expire. - if s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp) { - if err := t.acquireNodeLease(ctx, m, hlc.Timestamp{}); err != nil { - return nil, err + // Acquire a lease if no lease exists or if the latest lease is about to + // expire. Looping is necessary because lease acquisition is done without + // holding the tableState lock, so anything can happen in between lease + // acquisition and us getting control again. + for s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp); s = t.mu.active.findNewest() { + var resultChan <-chan singleflight.Result + resultChan, _ = t.mu.group.DoChan(acquireGroupKey, func() (interface{}, error) { + return t.acquireNodeLease(ctx, m, hlc.Timestamp{}) + }) + t.mu.Unlock() + if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil { + m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent(LeaseAcquireBlock) + } + result := <-resultChan + t.mu.Lock() + if result.Err != nil { + return nil, result.Err } } - return t.findForTimestamp(ctx, timestamp, m) } @@ -703,20 +727,59 @@ func (t *tableState) findForTimestamp( // // t.mu must be locked. func (t *tableState) acquireFreshestFromStoreLocked(ctx context.Context, m *LeaseManager) error { - // Ensure there is no lease acquisition in progress. - t.acquireWait() - - // Move forward to acquire a fresh table lease. + // We need to acquire a lease on a "fresh" descriptor, meaning that joining + // a potential in-progress lease acquisition is generally not good enough. + // If we are to join an in-progress acquisition, it needs to be an acquisition + // initiated after this point. + // So, we handle two cases: + // 1. The first DoChan() call tells us that we didn't join an in-progress + // acquisition. Great, the lease that's being acquired is good. + // 2. The first DoChan() call tells us that we did join an in-progress acq. + // We have to wait this acquisition out; it's not good for us. But any + // future acquisition is good, so the next time around the loop it doesn't + // matter if we initiate a request or join an in-progress one. + // In both cases, we need to check if the lease we want is still valid because + // lease acquisition is done without holding the tableState lock, so anything + // can happen in between lease acquisition and us getting control again. + attemptsMade := 0 + for { + // Move forward the expiry to acquire a fresh table lease. + + // Set the min expiration time to guarantee that the lease acquired is the + // last lease in t.mu.active. + // TODO(vivek): the expiration time is no longer needed to sort the + // tableVersionState. Get rid of this. + minExpirationTime := hlc.Timestamp{} + s := t.mu.active.findNewest() + if s != nil { + minExpirationTime = s.expiration.Add(int64(time.Millisecond), 0) + } - // Set the min expiration time to guarantee that the lease acquired is the - // last lease in t.mu.active . - minExpirationTime := hlc.Timestamp{} - newestTable := t.mu.active.findNewest() - if newestTable != nil { - minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0) + resultChan, wasCalled := t.mu.group.DoChan(acquireGroupKey, func() (interface{}, error) { + return t.acquireNodeLease(ctx, m, minExpirationTime) + }) + t.mu.Unlock() + if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil { + m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent(LeaseAcquireFreshestBlock) + } + result := <-resultChan + t.mu.Lock() + if result.Err != nil { + return result.Err + } + l := result.Val.(leaseToken) + if wasCalled && t.mu.active.isNewest(l) { + // Case 1: we didn't join an in-progress call and the lease is still + // valid. + break + } else if attemptsMade > 1 && t.mu.active.isNewest(l) { + // Case 2: more than one acquisition has happened and the lease is still + // valid. + break + } + attemptsMade++ } - - return t.acquireNodeLease(ctx, m, minExpirationTime) + return nil } // upsertLocked inserts a lease for a particular table version. @@ -763,18 +826,6 @@ func (t *tableState) removeInactiveVersions(m *LeaseManager) { } } -// acquireWait waits until no lease acquisition is in progress. -func (t *tableState) acquireWait() { - // Spin until no lease acquisition is in progress. - for acquiring := t.mu.acquiring; acquiring != nil; acquiring = t.mu.acquiring { - // We're called with mu locked, but need to unlock it while we wait - // for the in-progress lease acquisition to finish. - t.mu.Unlock() - <-acquiring - t.mu.Lock() - } -} - // If the lease cannot be obtained because the descriptor is in the process of // being dropped, the error will be errTableDropped. // minExpirationTime, if not set to the zero value, will be used as a lower @@ -782,32 +833,21 @@ func (t *tableState) acquireWait() { // jitter in the expiration time, and guarantee that we get a lease that will be // inserted at the end of the lease set (i.e. it will be returned by // findNewest() from now on). -// -// t.mu needs to be locked. func (t *tableState) acquireNodeLease( ctx context.Context, m *LeaseManager, minExpirationTime hlc.Timestamp, -) error { +) (leaseToken, error) { if m.isDraining() { - return errors.New("cannot acquire lease when draining") + return nil, errors.New("cannot acquire lease when draining") } - - // Notify when lease has been acquired. - t.mu.acquiring = make(chan struct{}) - defer func() { - close(t.mu.acquiring) - t.mu.acquiring = nil - }() - // We're called with mu locked, but need to unlock it during lease - // acquisition. - t.mu.Unlock() table, err := m.LeaseStore.acquire(ctx, t.id, minExpirationTime) - t.mu.Lock() if err != nil { - return err + return nil, err } + t.mu.Lock() + defer t.mu.Unlock() t.upsertLocked(ctx, table, m) t.tableNameCache.insert(table) - return nil + return leaseToken(table), nil } func (t *tableState) release(table *sqlbase.TableDescriptor, m *LeaseManager) error { @@ -931,6 +971,19 @@ func (t *tableState) purgeOldVersions( return err } +// LeaseAcquireBlockType is the type of blocking result event when +// calling LeaseAcquireResultBlockEvent. +type LeaseAcquireBlockType int + +const ( + // LeaseAcquireBlock denotes the LeaseAcquireResultBlockEvent is + // coming from tableState.acquire(). + LeaseAcquireBlock LeaseAcquireBlockType = iota + // LeaseAcquireFreshestBlock denotes the LeaseAcquireResultBlockEvent is + // from tableState.acquireFreshestFromStoreLocked(). + LeaseAcquireFreshestBlock +) + // LeaseStoreTestingKnobs contains testing knobs. type LeaseStoreTestingKnobs struct { // Called after a lease is removed from the store, with any operation error. @@ -938,6 +991,9 @@ type LeaseStoreTestingKnobs struct { LeaseReleasedEvent func(table sqlbase.TableDescriptor, err error) // Called after a lease is acquired, with any operation error. LeaseAcquiredEvent func(table sqlbase.TableDescriptor, err error) + // Called before waiting on a results from a DoChan call of acquireNodeLease + // in tableState.acquire() and tableState.acquireFreshestFromStoreLocked(). + LeaseAcquireResultBlockEvent func(leaseBlockType LeaseAcquireBlockType) // RemoveOnceDereferenced forces leases to be removed // as soon as they are dereferenced. RemoveOnceDereferenced bool diff --git a/pkg/sql/lease_internal_test.go b/pkg/sql/lease_internal_test.go index 7229072cbc76..a877a27680f2 100644 --- a/pkg/sql/lease_internal_test.go +++ b/pkg/sql/lease_internal_test.go @@ -19,16 +19,19 @@ package sql import ( "fmt" "sync" + "sync/atomic" "testing" "golang.org/x/net/context" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" ) func TestTableSet(t *testing.T) { @@ -411,7 +414,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); // Try to trigger the race repeatedly: race an AcquireByName against a // Release. - // tableChan acts as a barrier, synchornizing the two routines at every + // tableChan acts as a barrier, synchronizing the two routines at every // iteration. tableChan := make(chan *sqlbase.TableDescriptor) errChan := make(chan error) @@ -558,3 +561,212 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); wg.Wait() } + +// Test one possible outcome of a race between a lease acquisition (the first +// case through tableState.acquire(), the second through +// tableState.acquireFreshestFromStore()) and a release of the lease that was +// just acquired. Precisely: +// 1. Thread 1 calls either acquireFreshestFromStore() or acquire(). +// 2. Thread 1 releases the lock on tableState and starts acquisition of a lease +// from the store, blocking until it's finished. +// 3. Thread 2 calls acquire(). The lease has not been acquired yet, so it +// also enters the acquisition code path (calling DoChan). +// 4. Thread 2 proceeds to release the lock on tableState waiting for the +// in-flight acquisition. +// 4. The lease is acquired from the store and the waiting routines are +// unblocked. +// 5. Thread 2 unblocks first, and releases the new lease, for whatever reason. +// 5. Thread 1 wakes up. At this point, a naive implementation would use the +// newly acquired lease, which would be incorrect. The test checks that +// acquireFreshestFromStore() or acquire() notices, after re-acquiring the +// tableState lock, that the new lease has been released and acquires a new +// one. +func TestLeaseAcquireAndReleaseConcurrenctly(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Result is a struct for moving results to the main result routine. + type Result struct { + table *sqlbase.TableDescriptor + exp hlc.Timestamp + err error + } + + descID := sqlbase.ID(keys.LeaseTableID) + + // acquireBlock calls Acquire. + acquireBlock := func( + ctx context.Context, + m *LeaseManager, + acquireChan chan Result, + ) { + table, e, err := m.Acquire(ctx, m.clock.Now(), descID) + acquireChan <- Result{err: err, exp: e, table: table} + } + + testCases := []struct { + name string + // Whether the second routine is a call to LeaseManager.acquireFreshest or + // not. This determines which channel we unblock. + isSecondCallAcquireFreshest bool + }{ + + // Checks what happens when the race between between acquire() and + // lease release occurs. + { + name: "CallAcquireConcurrently", + isSecondCallAcquireFreshest: false, + }, + // Checks what happens when the race between + // acquireFreshestFromStore() and lease release occurs. + { + name: "CallAcquireFreshestAndAcquireConcurrently", + isSecondCallAcquireFreshest: true, + }, + } + + for _, test := range testCases { + ctx := log.WithLogTag(context.Background(), "test: Lease", nil) + + t.Run(test.name, func(t *testing.T) { + // blockChan and freshestBlockChan is used to set up the race condition. + blockChan := make(chan struct{}) + freshestBlockChan := make(chan struct{}) + // acquisitionBlock is used to prevent acquireNodeLease from + // completing, to force a lease to delay its acquisition. + acquisitionBlock := make(chan struct{}) + + // preblock is used for the main routine to wait for all acquisition + // routines to catch up. + var preblock sync.WaitGroup + // acquireArrivals and acquireFreshestArrivals tracks how many times + // we've arrived at the knob codepath for the corresponding functions. + // This is needed because the fix to the race condition hits the knob more + // than once in a single routine, so we need to ignore any extra passes. + var acquireArrivals int32 + var acquireFreshestArrivals int32 + // leasesAcquiredCount counts how many leases were acquired in total. + var leasesAcquiredCount int32 + + removalTracker := NewLeaseRemovalTracker() + testingKnobs := base.TestingKnobs{ + SQLLeaseManager: &LeaseManagerTestingKnobs{ + LeaseStoreTestingKnobs: LeaseStoreTestingKnobs{ + RemoveOnceDereferenced: true, + LeaseReleasedEvent: removalTracker.LeaseRemovedNotification, + LeaseAcquireResultBlockEvent: func(leaseBlockType LeaseAcquireBlockType) { + if leaseBlockType == LeaseAcquireBlock { + if count := atomic.LoadInt32(&acquireArrivals); (count < 1 && test.isSecondCallAcquireFreshest) || + (count < 2 && !test.isSecondCallAcquireFreshest) { + atomic.AddInt32(&acquireArrivals, 1) + preblock.Done() + <-blockChan + } + } else if leaseBlockType == LeaseAcquireFreshestBlock { + if atomic.LoadInt32(&acquireFreshestArrivals) < 1 { + atomic.AddInt32(&acquireFreshestArrivals, 1) + preblock.Done() + <-freshestBlockChan + } + } + }, + LeaseAcquiredEvent: func(_ sqlbase.TableDescriptor, _ error) { + atomic.AddInt32(&leasesAcquiredCount, 1) + <-acquisitionBlock + }, + }, + }, + } + + serverArgs := base.TestServerArgs{Knobs: testingKnobs} + + // Set the lease jitter so leases will have monotonically + // increasing expiration. This prevents two leases from having the + // same expiration due to randomness, as the leases are checked + // for having a different expiration. + leaseJitterMultiplier := 0.0 + serverArgs.LeaseManagerConfig.TableDescriptorLeaseJitterFraction = &leaseJitterMultiplier + + s, _, _ := serverutils.StartServer( + t, serverArgs) + defer s.Stopper().Stop(context.TODO()) + leaseManager := s.LeaseManager().(*LeaseManager) + + acquireResultChan := make(chan Result) + + // Start two routines to acquire and release. + preblock.Add(2) + go acquireBlock(ctx, leaseManager, acquireResultChan) + if test.isSecondCallAcquireFreshest { + go func(ctx context.Context, m *LeaseManager, acquireChan chan Result) { + table, e, err := m.acquireFreshestFromStore(ctx, descID) + acquireChan <- Result{err: err, exp: e, table: table} + }(ctx, leaseManager, acquireResultChan) + + } else { + go acquireBlock(ctx, leaseManager, acquireResultChan) + } + + // Wait until both routines arrive. + preblock.Wait() + + // Allow the acquisition to finish. By delaying it until now, we guarantee + // both routines will receive the same lease. + acquisitionBlock <- struct{}{} + + // Allow the first routine to finish acquisition. In the case where both + // routines are calling Acquire(), first refers to whichever routine + // continues, order does not matter. + blockChan <- struct{}{} + // Wait for the first routine's results. + result1 := <-acquireResultChan + if result1.err != nil { + t.Fatal(result1.err) + } + + // Release the lease. This also causes it to get removed as the + // knob RemoveOnceDereferenced is set. + tracker := removalTracker.TrackRemoval(result1.table) + if err := leaseManager.Release(result1.table); err != nil { + t.Fatal(err) + } + // Wait until the lease is removed. + if err := tracker.WaitForRemoval(); err != nil { + t.Fatal(err) + } + + // Allow the second routine to proceed. + if test.isSecondCallAcquireFreshest { + freshestBlockChan <- struct{}{} + } else { + blockChan <- struct{}{} + } + + // Allow all future acquisitions to complete. + close(acquisitionBlock) + + // Get the acquisition results of the second routine. + result2 := <-acquireResultChan + if result2.err != nil { + t.Fatal(result2.err) + } + + // Release the lease. This also causes it to get removed as the + // knob RemoveOnceDereferenced is set + tracker = removalTracker.TrackRemoval(result2.table) + if err := leaseManager.Release(result2.table); err != nil { + t.Fatal(err) + } + // Wait until the lease is removed. + if err := tracker.WaitForRemoval(); err != nil { + t.Fatal(err) + } + + if result1.table == result2.table && result1.exp == result2.exp { + t.Fatalf("Expected the leases to be different. TableDescriptor pointers are equal and both the same expiration") + } + if count := atomic.LoadInt32(&leasesAcquiredCount); count != 2 { + t.Fatalf("Expected to acquire 2 leases, instead got %d", count) + } + }) + } +} diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index e1bccfeb79d6..759f54bfdced 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -227,8 +227,10 @@ func TestLeaseManager(testingT *testing.T) { // table and expiration. l1, e1 := t.mustAcquire(1, descID) l2, e2 := t.mustAcquire(1, descID) - if l1.ID != l2.ID || e1 != e2 { + if l1.ID != l2.ID { t.Fatalf("expected same lease, but found %v != %v", l1, l2) + } else if e1 != e2 { + t.Fatalf("expected same lease timestamps, but found %v != %v", e1, e2) } t.expectLeases(descID, "/1/1") // Node 2 never acquired a lease on descID, so we should expect an error.