From 3a2bf083b9a0011d55b2f28a4ca78178da2e3487 Mon Sep 17 00:00:00 2001 From: Joey Pereira Date: Fri, 29 Sep 2017 14:20:37 -0400 Subject: [PATCH] sql: Refactor TableDescriptor lease acquisition to use singleflight This changes the acquisition of leases to use singleflight to reduce the complexity of handling concurrent acquisitions. The behaviour is kept the same as singleflight.Do blocks until the acquisition completes. The change is in preperation for converting the code to change lease acquisition to happen asynchronously using singleflight.DoChan. This also refactors out the jitter multiplier into a constant. --- pkg/sql/lease.go | 99 ++++++------- pkg/sql/lease_internal_test.go | 255 ++++++++++++++++++++++++++++++++- pkg/sql/lease_test.go | 4 +- 3 files changed, 307 insertions(+), 51 deletions(-) diff --git a/pkg/sql/lease.go b/pkg/sql/lease.go index b64735beef6d..0b2b2cec3d93 100644 --- a/pkg/sql/lease.go +++ b/pkg/sql/lease.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) @@ -544,6 +545,7 @@ type tableState struct { mu struct { syncutil.Mutex + group singleflight.Group // table descriptors sorted by increasing version. This set always // contains a table descriptor version with a lease as the latest @@ -554,11 +556,6 @@ type tableState struct { // entry is created with the expiration time of the new lease and // the older entry is removed. active tableSet - // A channel used to indicate whether a lease is actively being - // acquired. nil if there is no lease acquisition in progress for - // the table. If non-nil, the channel will be closed when lease - // acquisition completes. - acquiring chan struct{} // Indicates that the table has been dropped, or is being dropped. // If set, leases are released from the store as soon as their // refcount drops to 0, as opposed to waiting until they expire. @@ -574,17 +571,21 @@ func (t *tableState) acquire( ) (*tableVersionState, error) { t.mu.Lock() defer t.mu.Unlock() - // Wait for any existing lease acquisition. - t.acquireWait() - // Acquire a lease if no lease exists or if the latest lease is - // about to expire. - if s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp) { - if err := t.acquireNodeLease(ctx, m, hlc.Timestamp{}); err != nil { + // Acquire a lease if no lease exists or if the latest lease is about to + // expire. Repeatedly check to ensure either a lease is acquired or we receive + // an error. Checking we have a valid lease after acquisition is required + // if the acquiring routine immediately releases the lease. + for s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp); s = t.mu.active.findNewest() { + t.mu.Unlock() + _, _, err := t.mu.group.Do("acquire", func() (interface{}, error) { + return nil, t.acquireNodeLease(ctx, m, hlc.Timestamp{}) + }) + t.mu.Lock() + if err != nil { return nil, err } } - return t.findForTimestamp(ctx, timestamp, m) } @@ -697,20 +698,43 @@ func (t *tableState) findForTimestamp( // // t.mu must be locked. func (t *tableState) acquireFreshestFromStoreLocked(ctx context.Context, m *LeaseManager) error { - // Ensure there is no lease acquisition in progress. - t.acquireWait() - - // Move forward to acquire a fresh table lease. + // Continue to attempt to call to acquireNodeLease until our call was the only + // one made by singleflight. This busy-waits while other acquisitions are + // underway. + for { + // Move forward the expiry to acquire a fresh table lease. + + // Set the min expiration time to guarantee that the lease acquired is the + // last lease in t.mu.active . + // TODO(vivek): the expiration time is no longer needed to sort the + // tableVersionState. Get rid of this. + minExpirationTime := hlc.Timestamp{} + newestTable := t.mu.active.findNewest() + if newestTable != nil { + minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0) + } - // Set the min expiration time to guarantee that the lease acquired is the - // last lease in t.mu.active . - minExpirationTime := hlc.Timestamp{} - newestTable := t.mu.active.findNewest() - if newestTable != nil { - minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0) + t.mu.Unlock() + // hasAcquiredFreshest is used to determine if our function was the one + // called by group.Do. The shared bool returned by Do is not suffice as it + // returns true if any other calls to Do use the result, not if our function + // returns. If two routines simultaneously call Do here, they endlessly + // retry. + hasAcquiredFreshest := false + _, _, err := t.mu.group.Do("acquire", func() (interface{}, error) { + hasAcquiredFreshest = true + return nil, t.acquireNodeLease(ctx, m, minExpirationTime) + }) + t.mu.Lock() + // Before returning we need to check if the newest lease has been released + // immediately from an other routine that raced to lock t.mu. + // We also need to check if the table was dropped before locking t.mu. + if err != nil { + return err + } else if s := t.mu.active.findNewest(); hasAcquiredFreshest && s != nil { + return err + } } - - return t.acquireNodeLease(ctx, m, minExpirationTime) } // upsertLocked inserts a lease for a particular table version. @@ -757,18 +781,6 @@ func (t *tableState) removeInactiveVersions(m *LeaseManager) { } } -// acquireWait waits until no lease acquisition is in progress. -func (t *tableState) acquireWait() { - // Spin until no lease acquisition is in progress. - for acquiring := t.mu.acquiring; acquiring != nil; acquiring = t.mu.acquiring { - // We're called with mu locked, but need to unlock it while we wait - // for the in-progress lease acquisition to finish. - t.mu.Unlock() - <-acquiring - t.mu.Lock() - } -} - // If the lease cannot be obtained because the descriptor is in the process of // being dropped, the error will be errTableDropped. // minExpirationTime, if not set to the zero value, will be used as a lower @@ -776,29 +788,18 @@ func (t *tableState) acquireWait() { // jitter in the expiration time, and guarantee that we get a lease that will be // inserted at the end of the lease set (i.e. it will be returned by // findNewest() from now on). -// -// t.mu needs to be locked. func (t *tableState) acquireNodeLease( ctx context.Context, m *LeaseManager, minExpirationTime hlc.Timestamp, ) error { if m.isDraining() { return errors.New("cannot acquire lease when draining") } - - // Notify when lease has been acquired. - t.mu.acquiring = make(chan struct{}) - defer func() { - close(t.mu.acquiring) - t.mu.acquiring = nil - }() - // We're called with mu locked, but need to unlock it during lease - // acquisition. - t.mu.Unlock() table, err := m.LeaseStore.acquire(ctx, t.id, minExpirationTime) - t.mu.Lock() if err != nil { return err } + t.mu.Lock() + defer t.mu.Unlock() t.upsertLocked(ctx, table, m) t.tableNameCache.insert(table) return nil diff --git a/pkg/sql/lease_internal_test.go b/pkg/sql/lease_internal_test.go index 7229072cbc76..9cd93ceacf77 100644 --- a/pkg/sql/lease_internal_test.go +++ b/pkg/sql/lease_internal_test.go @@ -411,7 +411,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); // Try to trigger the race repeatedly: race an AcquireByName against a // Release. - // tableChan acts as a barrier, synchornizing the two routines at every + // tableChan acts as a barrier, synchronizing the two routines at every // iteration. tableChan := make(chan *sqlbase.TableDescriptor) errChan := make(chan error) @@ -558,3 +558,256 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); wg.Wait() } + +// Test for a race where after acquiring a lease with acquireFreshestFromStore and +// acquire simultaneously, acquire releases the lease before +// acquireFreshestFromStore completes. This happens because of the lock handoff +// to the async acquireNodeLease. +// The error manifests as a panic in acquireFreshestFromStore. +func TestLeaseReleaseImmediatelyWithAcquireFreshestFromStore(t *testing.T) { + defer leaktest.AfterTest(t)() + removalTracker := NewLeaseRemovalTracker() + testingKnobs := base.TestingKnobs{ + SQLLeaseManager: &LeaseManagerTestingKnobs{ + LeaseStoreTestingKnobs: LeaseStoreTestingKnobs{ + LeaseReleasedEvent: removalTracker.LeaseRemovedNotification, + }, + }, + } + s, sqlDB, kvDB := serverutils.StartServer( + t, base.TestServerArgs{Knobs: testingKnobs}) + defer s.Stopper().Stop(context.TODO()) + leaseManager := s.LeaseManager().(*LeaseManager) + + if _, err := sqlDB.Exec(` +CREATE DATABASE t; +CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); +`); err != nil { + t.Fatal(err) + } + + tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test") + + // Populate the name cache. + ctx := context.TODO() + table, _, err := leaseManager.AcquireByName(ctx, leaseManager.clock.Now(), tableDesc.ParentID, "test") + if err != nil { + t.Fatal(err) + } + if err := leaseManager.Release(table); err != nil { + t.Fatal(err) + } + + // Pretend the table has been dropped, so that when we release leases on it, + // they are removed from the tableNameCache too. + tableState := leaseManager.findTableState(tableDesc.ID, true) + tableState.mu.Lock() + tableState.mu.dropped = true + tableState.mu.Unlock() + + // Try to trigger the race repeatedly: race an acquireFreshestFromStore + // against a AquireByName and Release. + // tableChan acts as a barrier, synchronizing the two routines at every + // iteration. + tableChan := make(chan bool) + type Result struct { + table *sqlbase.TableDescriptor + err error + } + resultChan := make(chan Result) + go func() { + for range tableChan { + result := Result{} + tableByName, _, err := leaseManager.AcquireByName(ctx, leaseManager.clock.Now(), tableDesc.ParentID, "test") + if err != nil { + // Move errors to the main goroutine. + result.err = err + resultChan <- result + } + result.table = tableByName + + // Move errors to the main goroutine. + result.err = leaseManager.Release(tableByName) + resultChan <- result + } + }() + + for i := 0; i < 50; i++ { + ctx := context.TODO() + + // This test will need to wait until leases are removed from the store + // before creating new leases because the jitter used in the leases' + // expiration causes duplicate key errors when trying to create new + // leases. This is not a problem in production, since leases are not + // removed from the store until they expire, and the jitter is small + // compared to their lifetime, but it is a problem in this test because + // we churn through leases quickly. + tracker := removalTracker.TrackRemoval(table) + // Start the race: signal the other guy to release, and we do a + // acquireFreshestFromStore at the same time. + tableChan <- true + freshestTable, _, err := leaseManager.acquireFreshestFromStore(ctx, tableDesc.ID) + if err != nil { + t.Fatal(err) + } + tracker2 := removalTracker.TrackRemoval(freshestTable) + // See if there was an error releasing lease. + result := <-resultChan + if err != nil { + t.Fatal(err) + } + + // Depending on how the race went, there are a few cases: + // - acquireFreshestFromStore began, then AcquireByName got the finished lease + // and released it before acquireFreshestFromStore finished. This is the + // race we are testing for. + // - acquireFreshestFromStore finished before Release was called. + // - AcquireByName ran first, got a leaase then acquireFreshestFromStore got + // a different lease. We don't care about Release as it released a different + // lease. + if freshestTable.ID == result.table.ID { + // Case 1 and 2 + if err := leaseManager.Release(freshestTable); err != nil { + t.Fatal(err) + } + if err := tracker.WaitForRemoval(); err != nil { + t.Fatal(err) + } + } else { + // Case 3 + if err := leaseManager.Release(freshestTable); err != nil { + t.Fatal(err) + } + if err := tracker2.WaitForRemoval(); err != nil { + t.Fatal(err) + } + } + } + close(tableChan) +} + +// Test for a race where after acquiring a lease with two acquires +// simultaneously, one routine releases the lease before the other completes. +// This happens because of the lock handoff to the async acquireNodeLease. +// The error manifests as a panic in tableState.acquire from findForTimestamp. +func TestLeaseReleaseImmediatelyWithConcurrentAcquires(t *testing.T) { + defer leaktest.AfterTest(t)() + removalTracker := NewLeaseRemovalTracker() + testingKnobs := base.TestingKnobs{ + SQLLeaseManager: &LeaseManagerTestingKnobs{ + LeaseStoreTestingKnobs: LeaseStoreTestingKnobs{ + LeaseReleasedEvent: removalTracker.LeaseRemovedNotification, + }, + }, + } + s, sqlDB, kvDB := serverutils.StartServer( + t, base.TestServerArgs{Knobs: testingKnobs}) + defer s.Stopper().Stop(context.TODO()) + leaseManager := s.LeaseManager().(*LeaseManager) + + if _, err := sqlDB.Exec(` +CREATE DATABASE t; +CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); +`); err != nil { + t.Fatal(err) + } + + tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test") + + // Populate the name cache. + ctx := context.TODO() + table, _, err := leaseManager.AcquireByName(ctx, leaseManager.clock.Now(), tableDesc.ParentID, "test") + if err != nil { + t.Fatal(err) + } + if err := leaseManager.Release(table); err != nil { + t.Fatal(err) + } + + // Pretend the table has been dropped, so that when we release leases on it, + // they are removed from the tableNameCache too. + tableState := leaseManager.findTableState(tableDesc.ID, true) + tableState.mu.Lock() + tableState.mu.dropped = true + tableState.mu.Unlock() + + // Try to trigger the race repeatedly: race a second AcquireByName + // against a AquireByName and Release. + // tableChan acts as a barrier, synchronizing the two routines at every + // iteration. + tableChan := make(chan bool) + type Result struct { + table *sqlbase.TableDescriptor + err error + } + resultChan := make(chan Result) + go func() { + for range tableChan { + result := Result{} + tableByName, _, err := leaseManager.AcquireByName(ctx, leaseManager.clock.Now(), tableDesc.ParentID, "test") + if err != nil { + // Move errors to the main goroutine. + result.err = err + resultChan <- result + } + result.table = tableByName + + // Move errors to the main goroutine. + result.err = leaseManager.Release(tableByName) + resultChan <- result + } + }() + + for i := 0; i < 50; i++ { + ctx := context.TODO() + + // This test will need to wait until leases are removed from the store + // before creating new leases because the jitter used in the leases' + // expiration causes duplicate key errors when trying to create new + // leases. This is not a problem in production, since leases are not + // removed from the store until they expire, and the jitter is small + // compared to their lifetime, but it is a problem in this test because + // we churn through leases quickly. + tracker := removalTracker.TrackRemoval(table) + // Start the race: signal the other guy to release, and we do a + // acquireFreshestFromStore at the same time. + tableChan <- true + tableByName, _, err := leaseManager.AcquireByName(ctx, leaseManager.clock.Now(), tableDesc.ParentID, "test") + // If we failed to handle the race condition, this call will error when + // calling t.findForTimestamp as no valid lease exists. + if err != nil { + t.Fatal(err) + } + tracker2 := removalTracker.TrackRemoval(tableByName) + // See if there was an error releasing lease. + result := <-resultChan + if err != nil { + t.Fatal(err) + } + + // Depending on how the race went, there are a few cases: + // - One acquire begins, then the second acquire begins, completes, and + // releases before the first finishes. This is the + // race we are testing for. + // - One acquire begins and completes before the second is able to release + // the lease. + if tableByName.ID == result.table.ID { + // Case 1. In a succesfull case, we've gotten a different lease. + if err := leaseManager.Release(tableByName); err != nil { + t.Fatal(err) + } + if err := tracker.WaitForRemoval(); err != nil { + t.Fatal(err) + } + } else { + // Case 2. + if err := leaseManager.Release(tableByName); err != nil { + t.Fatal(err) + } + if err := tracker2.WaitForRemoval(); err != nil { + t.Fatal(err) + } + } + } + close(tableChan) +} diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index e5c79a1c149a..6dc6a5525ed7 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -223,8 +223,10 @@ func TestLeaseManager(testingT *testing.T) { // table and expiration. l1, e1 := t.mustAcquire(1, descID) l2, e2 := t.mustAcquire(1, descID) - if l1.ID != l2.ID || e1 != e2 { + if l1.ID != l2.ID { t.Fatalf("expected same lease, but found %v != %v", l1, l2) + } else if e1 != e2 { + t.Fatalf("expected same lease timestamps, but found %v != %v", e1, e2) } t.expectLeases(descID, "/1/1") // Node 2 never acquired a lease on descID, so we should expect an error.