diff --git a/pkg/sql/lease.go b/pkg/sql/lease.go index b64735beef6d..78bb02579094 100644 --- a/pkg/sql/lease.go +++ b/pkg/sql/lease.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) @@ -544,6 +545,7 @@ type tableState struct { mu struct { syncutil.Mutex + group singleflight.Group // table descriptors sorted by increasing version. This set always // contains a table descriptor version with a lease as the latest @@ -554,11 +556,6 @@ type tableState struct { // entry is created with the expiration time of the new lease and // the older entry is removed. active tableSet - // A channel used to indicate whether a lease is actively being - // acquired. nil if there is no lease acquisition in progress for - // the table. If non-nil, the channel will be closed when lease - // acquisition completes. - acquiring chan struct{} // Indicates that the table has been dropped, or is being dropped. // If set, leases are released from the store as soon as their // refcount drops to 0, as opposed to waiting until they expire. @@ -574,17 +571,21 @@ func (t *tableState) acquire( ) (*tableVersionState, error) { t.mu.Lock() defer t.mu.Unlock() - // Wait for any existing lease acquisition. - t.acquireWait() - // Acquire a lease if no lease exists or if the latest lease is - // about to expire. - if s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp) { - if err := t.acquireNodeLease(ctx, m, hlc.Timestamp{}); err != nil { + // Acquire a lease if no lease exists or if the latest lease is about to + // expire. Repeatedly check to ensure either a lease is acquired or we receive + // an error. Checking we have a valid lease after acquisition is required + // if the acquiring routine immediately releases the lease. + for s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp); s = t.mu.active.findNewest() { + t.mu.Unlock() + _, _, err := t.mu.group.Do("acquire", func() (interface{}, error) { + return nil, t.acquireNodeLease(ctx, m, hlc.Timestamp{}) + }) + t.mu.Lock() + if err != nil { return nil, err } } - return t.findForTimestamp(ctx, timestamp, m) } @@ -697,20 +698,43 @@ func (t *tableState) findForTimestamp( // // t.mu must be locked. func (t *tableState) acquireFreshestFromStoreLocked(ctx context.Context, m *LeaseManager) error { - // Ensure there is no lease acquisition in progress. - t.acquireWait() - - // Move forward to acquire a fresh table lease. + // Continue to attempt to call to acquireNodeLease until our call was the only + // one made by singleflight. This busy-waits while other acquisitions are + // underway. + for { + // Move forward the expiry to acquire a fresh table lease. + + // Set the min expiration time to guarantee that the lease acquired is the + // last lease in t.mu.active . + // TODO(vivek): the expiration time is no longer needed to sort the + // tableVersionState. Get rid of this. + minExpirationTime := hlc.Timestamp{} + newestTable := t.mu.active.findNewest() + if newestTable != nil { + minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0) + } - // Set the min expiration time to guarantee that the lease acquired is the - // last lease in t.mu.active . - minExpirationTime := hlc.Timestamp{} - newestTable := t.mu.active.findNewest() - if newestTable != nil { - minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0) + t.mu.Unlock() + // hasAcquiredFreshest is used to determine if our function was the one + // called by group.Do. The shared bool returned by Do is not suffice as it + // returns true if any other calls to Do use the result, not if our function + // returns. If two routines simultaneously call Do here, they endlessly + // retry. + hasAcquiredFreshest := false + _, _, err := t.mu.group.Do("acquire", func() (interface{}, error) { + hasAcquiredFreshest = true + return nil, t.acquireNodeLease(ctx, m, minExpirationTime) + }) + t.mu.Lock() + // Before returning we need to check if the newest lease has been released + // immediately from an other routine that raced to lock t.mu. + // We also need to check if the table was dropped before locking t.mu. + if err != nil { + return nil + } else if s := t.mu.active.findNewest(); hasAcquiredFreshest && s != nil { + return err + } } - - return t.acquireNodeLease(ctx, m, minExpirationTime) } // upsertLocked inserts a lease for a particular table version. @@ -757,18 +781,6 @@ func (t *tableState) removeInactiveVersions(m *LeaseManager) { } } -// acquireWait waits until no lease acquisition is in progress. -func (t *tableState) acquireWait() { - // Spin until no lease acquisition is in progress. - for acquiring := t.mu.acquiring; acquiring != nil; acquiring = t.mu.acquiring { - // We're called with mu locked, but need to unlock it while we wait - // for the in-progress lease acquisition to finish. - t.mu.Unlock() - <-acquiring - t.mu.Lock() - } -} - // If the lease cannot be obtained because the descriptor is in the process of // being dropped, the error will be errTableDropped. // minExpirationTime, if not set to the zero value, will be used as a lower @@ -776,29 +788,18 @@ func (t *tableState) acquireWait() { // jitter in the expiration time, and guarantee that we get a lease that will be // inserted at the end of the lease set (i.e. it will be returned by // findNewest() from now on). -// -// t.mu needs to be locked. func (t *tableState) acquireNodeLease( ctx context.Context, m *LeaseManager, minExpirationTime hlc.Timestamp, ) error { if m.isDraining() { return errors.New("cannot acquire lease when draining") } - - // Notify when lease has been acquired. - t.mu.acquiring = make(chan struct{}) - defer func() { - close(t.mu.acquiring) - t.mu.acquiring = nil - }() - // We're called with mu locked, but need to unlock it during lease - // acquisition. - t.mu.Unlock() table, err := m.LeaseStore.acquire(ctx, t.id, minExpirationTime) - t.mu.Lock() if err != nil { return err } + t.mu.Lock() + defer t.mu.Unlock() t.upsertLocked(ctx, table, m) t.tableNameCache.insert(table) return nil diff --git a/pkg/sql/lease_internal_test.go b/pkg/sql/lease_internal_test.go index 7229072cbc76..9cd93ceacf77 100644 --- a/pkg/sql/lease_internal_test.go +++ b/pkg/sql/lease_internal_test.go @@ -411,7 +411,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); // Try to trigger the race repeatedly: race an AcquireByName against a // Release. - // tableChan acts as a barrier, synchornizing the two routines at every + // tableChan acts as a barrier, synchronizing the two routines at every // iteration. tableChan := make(chan *sqlbase.TableDescriptor) errChan := make(chan error) @@ -558,3 +558,256 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); wg.Wait() } + +// Test for a race where after acquiring a lease with acquireFreshestFromStore and +// acquire simultaneously, acquire releases the lease before +// acquireFreshestFromStore completes. This happens because of the lock handoff +// to the async acquireNodeLease. +// The error manifests as a panic in acquireFreshestFromStore. +func TestLeaseReleaseImmediatelyWithAcquireFreshestFromStore(t *testing.T) { + defer leaktest.AfterTest(t)() + removalTracker := NewLeaseRemovalTracker() + testingKnobs := base.TestingKnobs{ + SQLLeaseManager: &LeaseManagerTestingKnobs{ + LeaseStoreTestingKnobs: LeaseStoreTestingKnobs{ + LeaseReleasedEvent: removalTracker.LeaseRemovedNotification, + }, + }, + } + s, sqlDB, kvDB := serverutils.StartServer( + t, base.TestServerArgs{Knobs: testingKnobs}) + defer s.Stopper().Stop(context.TODO()) + leaseManager := s.LeaseManager().(*LeaseManager) + + if _, err := sqlDB.Exec(` +CREATE DATABASE t; +CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); +`); err != nil { + t.Fatal(err) + } + + tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test") + + // Populate the name cache. + ctx := context.TODO() + table, _, err := leaseManager.AcquireByName(ctx, leaseManager.clock.Now(), tableDesc.ParentID, "test") + if err != nil { + t.Fatal(err) + } + if err := leaseManager.Release(table); err != nil { + t.Fatal(err) + } + + // Pretend the table has been dropped, so that when we release leases on it, + // they are removed from the tableNameCache too. + tableState := leaseManager.findTableState(tableDesc.ID, true) + tableState.mu.Lock() + tableState.mu.dropped = true + tableState.mu.Unlock() + + // Try to trigger the race repeatedly: race an acquireFreshestFromStore + // against a AquireByName and Release. + // tableChan acts as a barrier, synchronizing the two routines at every + // iteration. + tableChan := make(chan bool) + type Result struct { + table *sqlbase.TableDescriptor + err error + } + resultChan := make(chan Result) + go func() { + for range tableChan { + result := Result{} + tableByName, _, err := leaseManager.AcquireByName(ctx, leaseManager.clock.Now(), tableDesc.ParentID, "test") + if err != nil { + // Move errors to the main goroutine. + result.err = err + resultChan <- result + } + result.table = tableByName + + // Move errors to the main goroutine. + result.err = leaseManager.Release(tableByName) + resultChan <- result + } + }() + + for i := 0; i < 50; i++ { + ctx := context.TODO() + + // This test will need to wait until leases are removed from the store + // before creating new leases because the jitter used in the leases' + // expiration causes duplicate key errors when trying to create new + // leases. This is not a problem in production, since leases are not + // removed from the store until they expire, and the jitter is small + // compared to their lifetime, but it is a problem in this test because + // we churn through leases quickly. + tracker := removalTracker.TrackRemoval(table) + // Start the race: signal the other guy to release, and we do a + // acquireFreshestFromStore at the same time. + tableChan <- true + freshestTable, _, err := leaseManager.acquireFreshestFromStore(ctx, tableDesc.ID) + if err != nil { + t.Fatal(err) + } + tracker2 := removalTracker.TrackRemoval(freshestTable) + // See if there was an error releasing lease. + result := <-resultChan + if err != nil { + t.Fatal(err) + } + + // Depending on how the race went, there are a few cases: + // - acquireFreshestFromStore began, then AcquireByName got the finished lease + // and released it before acquireFreshestFromStore finished. This is the + // race we are testing for. + // - acquireFreshestFromStore finished before Release was called. + // - AcquireByName ran first, got a leaase then acquireFreshestFromStore got + // a different lease. We don't care about Release as it released a different + // lease. + if freshestTable.ID == result.table.ID { + // Case 1 and 2 + if err := leaseManager.Release(freshestTable); err != nil { + t.Fatal(err) + } + if err := tracker.WaitForRemoval(); err != nil { + t.Fatal(err) + } + } else { + // Case 3 + if err := leaseManager.Release(freshestTable); err != nil { + t.Fatal(err) + } + if err := tracker2.WaitForRemoval(); err != nil { + t.Fatal(err) + } + } + } + close(tableChan) +} + +// Test for a race where after acquiring a lease with two acquires +// simultaneously, one routine releases the lease before the other completes. +// This happens because of the lock handoff to the async acquireNodeLease. +// The error manifests as a panic in tableState.acquire from findForTimestamp. +func TestLeaseReleaseImmediatelyWithConcurrentAcquires(t *testing.T) { + defer leaktest.AfterTest(t)() + removalTracker := NewLeaseRemovalTracker() + testingKnobs := base.TestingKnobs{ + SQLLeaseManager: &LeaseManagerTestingKnobs{ + LeaseStoreTestingKnobs: LeaseStoreTestingKnobs{ + LeaseReleasedEvent: removalTracker.LeaseRemovedNotification, + }, + }, + } + s, sqlDB, kvDB := serverutils.StartServer( + t, base.TestServerArgs{Knobs: testingKnobs}) + defer s.Stopper().Stop(context.TODO()) + leaseManager := s.LeaseManager().(*LeaseManager) + + if _, err := sqlDB.Exec(` +CREATE DATABASE t; +CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); +`); err != nil { + t.Fatal(err) + } + + tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test") + + // Populate the name cache. + ctx := context.TODO() + table, _, err := leaseManager.AcquireByName(ctx, leaseManager.clock.Now(), tableDesc.ParentID, "test") + if err != nil { + t.Fatal(err) + } + if err := leaseManager.Release(table); err != nil { + t.Fatal(err) + } + + // Pretend the table has been dropped, so that when we release leases on it, + // they are removed from the tableNameCache too. + tableState := leaseManager.findTableState(tableDesc.ID, true) + tableState.mu.Lock() + tableState.mu.dropped = true + tableState.mu.Unlock() + + // Try to trigger the race repeatedly: race a second AcquireByName + // against a AquireByName and Release. + // tableChan acts as a barrier, synchronizing the two routines at every + // iteration. + tableChan := make(chan bool) + type Result struct { + table *sqlbase.TableDescriptor + err error + } + resultChan := make(chan Result) + go func() { + for range tableChan { + result := Result{} + tableByName, _, err := leaseManager.AcquireByName(ctx, leaseManager.clock.Now(), tableDesc.ParentID, "test") + if err != nil { + // Move errors to the main goroutine. + result.err = err + resultChan <- result + } + result.table = tableByName + + // Move errors to the main goroutine. + result.err = leaseManager.Release(tableByName) + resultChan <- result + } + }() + + for i := 0; i < 50; i++ { + ctx := context.TODO() + + // This test will need to wait until leases are removed from the store + // before creating new leases because the jitter used in the leases' + // expiration causes duplicate key errors when trying to create new + // leases. This is not a problem in production, since leases are not + // removed from the store until they expire, and the jitter is small + // compared to their lifetime, but it is a problem in this test because + // we churn through leases quickly. + tracker := removalTracker.TrackRemoval(table) + // Start the race: signal the other guy to release, and we do a + // acquireFreshestFromStore at the same time. + tableChan <- true + tableByName, _, err := leaseManager.AcquireByName(ctx, leaseManager.clock.Now(), tableDesc.ParentID, "test") + // If we failed to handle the race condition, this call will error when + // calling t.findForTimestamp as no valid lease exists. + if err != nil { + t.Fatal(err) + } + tracker2 := removalTracker.TrackRemoval(tableByName) + // See if there was an error releasing lease. + result := <-resultChan + if err != nil { + t.Fatal(err) + } + + // Depending on how the race went, there are a few cases: + // - One acquire begins, then the second acquire begins, completes, and + // releases before the first finishes. This is the + // race we are testing for. + // - One acquire begins and completes before the second is able to release + // the lease. + if tableByName.ID == result.table.ID { + // Case 1. In a succesfull case, we've gotten a different lease. + if err := leaseManager.Release(tableByName); err != nil { + t.Fatal(err) + } + if err := tracker.WaitForRemoval(); err != nil { + t.Fatal(err) + } + } else { + // Case 2. + if err := leaseManager.Release(tableByName); err != nil { + t.Fatal(err) + } + if err := tracker2.WaitForRemoval(); err != nil { + t.Fatal(err) + } + } + } + close(tableChan) +} diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index e5c79a1c149a..6dc6a5525ed7 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -223,8 +223,10 @@ func TestLeaseManager(testingT *testing.T) { // table and expiration. l1, e1 := t.mustAcquire(1, descID) l2, e2 := t.mustAcquire(1, descID) - if l1.ID != l2.ID || e1 != e2 { + if l1.ID != l2.ID { t.Fatalf("expected same lease, but found %v != %v", l1, l2) + } else if e1 != e2 { + t.Fatalf("expected same lease timestamps, but found %v != %v", e1, e2) } t.expectLeases(descID, "/1/1") // Node 2 never acquired a lease on descID, so we should expect an error.