diff --git a/pkg/sql/lease.go b/pkg/sql/lease.go index 29873bf14f1c..1e53c83d494c 100644 --- a/pkg/sql/lease.go +++ b/pkg/sql/lease.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) @@ -447,6 +448,11 @@ func (s LeaseStore) getForExpiration( return table, err } +// leaseToken is an opaque token representing a lease. It's distinct from a +// lease to define restricted capabilities and prevent improper use of a lease +// where we instead have leaseTokens. +type leaseToken *tableVersionState + // tableSet maintains an ordered set of tableVersionState objects sorted // by version. It supports addition and removal of elements, finding the // table for a particular version, or finding the most recent table version. @@ -472,6 +478,15 @@ func (l *tableSet) String() string { return buf.String() } +// isNewest checks if the leaseToken represents the newest lease in the +// tableSet. +func (l *tableSet) isNewest(t leaseToken) bool { + if len(l.data) == 0 { + return false + } + return leaseToken(l.data[len(l.data)-1]) == t +} + func (l *tableSet) insert(s *tableVersionState) { i, match := l.findIndex(s.Version) if match { @@ -542,6 +557,8 @@ func (l *tableSet) findVersion(version sqlbase.DescriptorVersion) *tableVersionS return nil } +const acquireGroupKey = "acquire" + type tableState struct { id sqlbase.ID // The cache is updated every time we acquire or release a table. @@ -551,6 +568,9 @@ type tableState struct { mu struct { syncutil.Mutex + // group is used for all calls made to acquireNodeLease to prevent + // concurrent lease acquisitions from the store. + group singleflight.Group // table descriptors sorted by increasing version. This set always // contains a table descriptor version with a lease as the latest // entry. There may be more than one active lease when the system is @@ -560,11 +580,6 @@ type tableState struct { // entry is created with the expiration time of the new lease and // the older entry is removed. active tableSet - // A channel used to indicate whether a lease is actively being - // acquired. nil if there is no lease acquisition in progress for - // the table. If non-nil, the channel will be closed when lease - // acquisition completes. - acquiring chan struct{} // Indicates that the table has been dropped, or is being dropped. // If set, leases are released from the store as soon as their // refcount drops to 0, as opposed to waiting until they expire. @@ -580,17 +595,26 @@ func (t *tableState) acquire( ) (*tableVersionState, error) { t.mu.Lock() defer t.mu.Unlock() - // Wait for any existing lease acquisition. - t.acquireWait() - // Acquire a lease if no lease exists or if the latest lease is - // about to expire. - if s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp) { - if err := t.acquireNodeLease(ctx, m, hlc.Timestamp{}); err != nil { - return nil, err + // Acquire a lease if no lease exists or if the latest lease is about to + // expire. Looping is necessary because lease acquisition is done without + // holding the tableState lock, so anything can happen in between lease + // acquisition and us getting control again. + for s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp); s = t.mu.active.findNewest() { + var resultChan <-chan singleflight.Result + resultChan, _ = t.mu.group.DoChan(acquireGroupKey, func() (interface{}, error) { + return t.acquireNodeLease(ctx, m, hlc.Timestamp{}) + }) + t.mu.Unlock() + if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil { + m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent(LeaseAcquireBlock) + } + result := <-resultChan + t.mu.Lock() + if result.Err != nil { + return nil, result.Err } } - return t.findForTimestamp(ctx, timestamp, m) } @@ -703,20 +727,59 @@ func (t *tableState) findForTimestamp( // // t.mu must be locked. func (t *tableState) acquireFreshestFromStoreLocked(ctx context.Context, m *LeaseManager) error { - // Ensure there is no lease acquisition in progress. - t.acquireWait() - - // Move forward to acquire a fresh table lease. + // We need to acquire a lease on a "fresh" descriptor, meaning that joining + // a potential in-progress lease acquisition is generally not good enough. + // If we are to join an in-progress acquisition, it needs to be an acquisition + // initiated after this point. + // So, we handle two cases: + // 1. The first DoChan() call tells us that we didn't join an in-progress + // acquisition. Great, the lease that's being acquired is good. + // 2. The first DoChan() call tells us that we did join an in-progress acq. + // We have to wait this acquisition out; it's not good for us. But any + // future acquisition is good, so the next time around the loop it doesn't + // matter if we initiate a request or join an in-progress one. + // In both cases, we need to check if the lease we want is still valid because + // lease acquisition is done without holding the tableState lock, so anything + // can happen in between lease acquisition and us getting control again. + attemptsMade := 0 + for { + // Move forward the expiry to acquire a fresh table lease. + + // Set the min expiration time to guarantee that the lease acquired is the + // last lease in t.mu.active. + // TODO(vivek): the expiration time is no longer needed to sort the + // tableVersionState. Get rid of this. + minExpirationTime := hlc.Timestamp{} + s := t.mu.active.findNewest() + if s != nil { + minExpirationTime = s.expiration.Add(int64(time.Millisecond), 0) + } - // Set the min expiration time to guarantee that the lease acquired is the - // last lease in t.mu.active . - minExpirationTime := hlc.Timestamp{} - newestTable := t.mu.active.findNewest() - if newestTable != nil { - minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0) + resultChan, wasCalled := t.mu.group.DoChan(acquireGroupKey, func() (interface{}, error) { + return t.acquireNodeLease(ctx, m, minExpirationTime) + }) + t.mu.Unlock() + if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil { + m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent(LeaseAcquireFreshestBlock) + } + result := <-resultChan + t.mu.Lock() + if result.Err != nil { + return result.Err + } + l := result.Val.(leaseToken) + if wasCalled && t.mu.active.isNewest(l) { + // Case 1: we didn't join an in-progress call and the lease is still + // valid. + break + } else if attemptsMade > 1 && t.mu.active.isNewest(l) { + // Case 2: more than one acquisition has happened and the lease is still + // valid. + break + } + attemptsMade++ } - - return t.acquireNodeLease(ctx, m, minExpirationTime) + return nil } // upsertLocked inserts a lease for a particular table version. @@ -763,18 +826,6 @@ func (t *tableState) removeInactiveVersions(m *LeaseManager) { } } -// acquireWait waits until no lease acquisition is in progress. -func (t *tableState) acquireWait() { - // Spin until no lease acquisition is in progress. - for acquiring := t.mu.acquiring; acquiring != nil; acquiring = t.mu.acquiring { - // We're called with mu locked, but need to unlock it while we wait - // for the in-progress lease acquisition to finish. - t.mu.Unlock() - <-acquiring - t.mu.Lock() - } -} - // If the lease cannot be obtained because the descriptor is in the process of // being dropped, the error will be errTableDropped. // minExpirationTime, if not set to the zero value, will be used as a lower @@ -782,32 +833,21 @@ func (t *tableState) acquireWait() { // jitter in the expiration time, and guarantee that we get a lease that will be // inserted at the end of the lease set (i.e. it will be returned by // findNewest() from now on). -// -// t.mu needs to be locked. func (t *tableState) acquireNodeLease( ctx context.Context, m *LeaseManager, minExpirationTime hlc.Timestamp, -) error { +) (leaseToken, error) { if m.isDraining() { - return errors.New("cannot acquire lease when draining") + return nil, errors.New("cannot acquire lease when draining") } - - // Notify when lease has been acquired. - t.mu.acquiring = make(chan struct{}) - defer func() { - close(t.mu.acquiring) - t.mu.acquiring = nil - }() - // We're called with mu locked, but need to unlock it during lease - // acquisition. - t.mu.Unlock() table, err := m.LeaseStore.acquire(ctx, t.id, minExpirationTime) - t.mu.Lock() if err != nil { - return err + return nil, err } + t.mu.Lock() + defer t.mu.Unlock() t.upsertLocked(ctx, table, m) t.tableNameCache.insert(table) - return nil + return leaseToken(table), nil } func (t *tableState) release(table *sqlbase.TableDescriptor, m *LeaseManager) error { @@ -931,6 +971,19 @@ func (t *tableState) purgeOldVersions( return err } +// LeaseAcquireBlockType is the type of blocking result event when +// calling LeaseAcquireResultBlockEvent. +type LeaseAcquireBlockType int + +const ( + // LeaseAcquireBlock denotes the LeaseAcquireResultBlockEvent is + // coming from tableState.acquire(). + LeaseAcquireBlock LeaseAcquireBlockType = iota + // LeaseAcquireFreshestBlock denotes the LeaseAcquireResultBlockEvent is + // from tableState.acquireFreshestFromStoreLocked(). + LeaseAcquireFreshestBlock +) + // LeaseStoreTestingKnobs contains testing knobs. type LeaseStoreTestingKnobs struct { // Called after a lease is removed from the store, with any operation error. @@ -938,6 +991,9 @@ type LeaseStoreTestingKnobs struct { LeaseReleasedEvent func(table sqlbase.TableDescriptor, err error) // Called after a lease is acquired, with any operation error. LeaseAcquiredEvent func(table sqlbase.TableDescriptor, err error) + // Called before waiting on a results from a DoChan call of acquireNodeLease + // in tableState.acquire() and tableState.acquireFreshestFromStoreLocked(). + LeaseAcquireResultBlockEvent func(leaseBlockType LeaseAcquireBlockType) // RemoveOnceDereferenced forces leases to be removed // as soon as they are dereferenced. RemoveOnceDereferenced bool diff --git a/pkg/sql/lease_internal_test.go b/pkg/sql/lease_internal_test.go index 7229072cbc76..a877a27680f2 100644 --- a/pkg/sql/lease_internal_test.go +++ b/pkg/sql/lease_internal_test.go @@ -19,16 +19,19 @@ package sql import ( "fmt" "sync" + "sync/atomic" "testing" "golang.org/x/net/context" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" ) func TestTableSet(t *testing.T) { @@ -411,7 +414,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); // Try to trigger the race repeatedly: race an AcquireByName against a // Release. - // tableChan acts as a barrier, synchornizing the two routines at every + // tableChan acts as a barrier, synchronizing the two routines at every // iteration. tableChan := make(chan *sqlbase.TableDescriptor) errChan := make(chan error) @@ -558,3 +561,212 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); wg.Wait() } + +// Test one possible outcome of a race between a lease acquisition (the first +// case through tableState.acquire(), the second through +// tableState.acquireFreshestFromStore()) and a release of the lease that was +// just acquired. Precisely: +// 1. Thread 1 calls either acquireFreshestFromStore() or acquire(). +// 2. Thread 1 releases the lock on tableState and starts acquisition of a lease +// from the store, blocking until it's finished. +// 3. Thread 2 calls acquire(). The lease has not been acquired yet, so it +// also enters the acquisition code path (calling DoChan). +// 4. Thread 2 proceeds to release the lock on tableState waiting for the +// in-flight acquisition. +// 4. The lease is acquired from the store and the waiting routines are +// unblocked. +// 5. Thread 2 unblocks first, and releases the new lease, for whatever reason. +// 5. Thread 1 wakes up. At this point, a naive implementation would use the +// newly acquired lease, which would be incorrect. The test checks that +// acquireFreshestFromStore() or acquire() notices, after re-acquiring the +// tableState lock, that the new lease has been released and acquires a new +// one. +func TestLeaseAcquireAndReleaseConcurrenctly(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Result is a struct for moving results to the main result routine. + type Result struct { + table *sqlbase.TableDescriptor + exp hlc.Timestamp + err error + } + + descID := sqlbase.ID(keys.LeaseTableID) + + // acquireBlock calls Acquire. + acquireBlock := func( + ctx context.Context, + m *LeaseManager, + acquireChan chan Result, + ) { + table, e, err := m.Acquire(ctx, m.clock.Now(), descID) + acquireChan <- Result{err: err, exp: e, table: table} + } + + testCases := []struct { + name string + // Whether the second routine is a call to LeaseManager.acquireFreshest or + // not. This determines which channel we unblock. + isSecondCallAcquireFreshest bool + }{ + + // Checks what happens when the race between between acquire() and + // lease release occurs. + { + name: "CallAcquireConcurrently", + isSecondCallAcquireFreshest: false, + }, + // Checks what happens when the race between + // acquireFreshestFromStore() and lease release occurs. + { + name: "CallAcquireFreshestAndAcquireConcurrently", + isSecondCallAcquireFreshest: true, + }, + } + + for _, test := range testCases { + ctx := log.WithLogTag(context.Background(), "test: Lease", nil) + + t.Run(test.name, func(t *testing.T) { + // blockChan and freshestBlockChan is used to set up the race condition. + blockChan := make(chan struct{}) + freshestBlockChan := make(chan struct{}) + // acquisitionBlock is used to prevent acquireNodeLease from + // completing, to force a lease to delay its acquisition. + acquisitionBlock := make(chan struct{}) + + // preblock is used for the main routine to wait for all acquisition + // routines to catch up. + var preblock sync.WaitGroup + // acquireArrivals and acquireFreshestArrivals tracks how many times + // we've arrived at the knob codepath for the corresponding functions. + // This is needed because the fix to the race condition hits the knob more + // than once in a single routine, so we need to ignore any extra passes. + var acquireArrivals int32 + var acquireFreshestArrivals int32 + // leasesAcquiredCount counts how many leases were acquired in total. + var leasesAcquiredCount int32 + + removalTracker := NewLeaseRemovalTracker() + testingKnobs := base.TestingKnobs{ + SQLLeaseManager: &LeaseManagerTestingKnobs{ + LeaseStoreTestingKnobs: LeaseStoreTestingKnobs{ + RemoveOnceDereferenced: true, + LeaseReleasedEvent: removalTracker.LeaseRemovedNotification, + LeaseAcquireResultBlockEvent: func(leaseBlockType LeaseAcquireBlockType) { + if leaseBlockType == LeaseAcquireBlock { + if count := atomic.LoadInt32(&acquireArrivals); (count < 1 && test.isSecondCallAcquireFreshest) || + (count < 2 && !test.isSecondCallAcquireFreshest) { + atomic.AddInt32(&acquireArrivals, 1) + preblock.Done() + <-blockChan + } + } else if leaseBlockType == LeaseAcquireFreshestBlock { + if atomic.LoadInt32(&acquireFreshestArrivals) < 1 { + atomic.AddInt32(&acquireFreshestArrivals, 1) + preblock.Done() + <-freshestBlockChan + } + } + }, + LeaseAcquiredEvent: func(_ sqlbase.TableDescriptor, _ error) { + atomic.AddInt32(&leasesAcquiredCount, 1) + <-acquisitionBlock + }, + }, + }, + } + + serverArgs := base.TestServerArgs{Knobs: testingKnobs} + + // Set the lease jitter so leases will have monotonically + // increasing expiration. This prevents two leases from having the + // same expiration due to randomness, as the leases are checked + // for having a different expiration. + leaseJitterMultiplier := 0.0 + serverArgs.LeaseManagerConfig.TableDescriptorLeaseJitterFraction = &leaseJitterMultiplier + + s, _, _ := serverutils.StartServer( + t, serverArgs) + defer s.Stopper().Stop(context.TODO()) + leaseManager := s.LeaseManager().(*LeaseManager) + + acquireResultChan := make(chan Result) + + // Start two routines to acquire and release. + preblock.Add(2) + go acquireBlock(ctx, leaseManager, acquireResultChan) + if test.isSecondCallAcquireFreshest { + go func(ctx context.Context, m *LeaseManager, acquireChan chan Result) { + table, e, err := m.acquireFreshestFromStore(ctx, descID) + acquireChan <- Result{err: err, exp: e, table: table} + }(ctx, leaseManager, acquireResultChan) + + } else { + go acquireBlock(ctx, leaseManager, acquireResultChan) + } + + // Wait until both routines arrive. + preblock.Wait() + + // Allow the acquisition to finish. By delaying it until now, we guarantee + // both routines will receive the same lease. + acquisitionBlock <- struct{}{} + + // Allow the first routine to finish acquisition. In the case where both + // routines are calling Acquire(), first refers to whichever routine + // continues, order does not matter. + blockChan <- struct{}{} + // Wait for the first routine's results. + result1 := <-acquireResultChan + if result1.err != nil { + t.Fatal(result1.err) + } + + // Release the lease. This also causes it to get removed as the + // knob RemoveOnceDereferenced is set. + tracker := removalTracker.TrackRemoval(result1.table) + if err := leaseManager.Release(result1.table); err != nil { + t.Fatal(err) + } + // Wait until the lease is removed. + if err := tracker.WaitForRemoval(); err != nil { + t.Fatal(err) + } + + // Allow the second routine to proceed. + if test.isSecondCallAcquireFreshest { + freshestBlockChan <- struct{}{} + } else { + blockChan <- struct{}{} + } + + // Allow all future acquisitions to complete. + close(acquisitionBlock) + + // Get the acquisition results of the second routine. + result2 := <-acquireResultChan + if result2.err != nil { + t.Fatal(result2.err) + } + + // Release the lease. This also causes it to get removed as the + // knob RemoveOnceDereferenced is set + tracker = removalTracker.TrackRemoval(result2.table) + if err := leaseManager.Release(result2.table); err != nil { + t.Fatal(err) + } + // Wait until the lease is removed. + if err := tracker.WaitForRemoval(); err != nil { + t.Fatal(err) + } + + if result1.table == result2.table && result1.exp == result2.exp { + t.Fatalf("Expected the leases to be different. TableDescriptor pointers are equal and both the same expiration") + } + if count := atomic.LoadInt32(&leasesAcquiredCount); count != 2 { + t.Fatalf("Expected to acquire 2 leases, instead got %d", count) + } + }) + } +} diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index e1bccfeb79d6..759f54bfdced 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -227,8 +227,10 @@ func TestLeaseManager(testingT *testing.T) { // table and expiration. l1, e1 := t.mustAcquire(1, descID) l2, e2 := t.mustAcquire(1, descID) - if l1.ID != l2.ID || e1 != e2 { + if l1.ID != l2.ID { t.Fatalf("expected same lease, but found %v != %v", l1, l2) + } else if e1 != e2 { + t.Fatalf("expected same lease timestamps, but found %v != %v", e1, e2) } t.expectLeases(descID, "/1/1") // Node 2 never acquired a lease on descID, so we should expect an error.