From 2cb59c2c9819d05ce5541111cf7befa84fe04c12 Mon Sep 17 00:00:00 2001 From: Joey Pereira Date: Fri, 29 Sep 2017 14:20:37 -0400 Subject: [PATCH] sql: Refactor TableDescriptor lease acquisition to use singleflight This changes the acquisition of leases to use singleflight to reduce the complexity of handling concurrent acquisitions. The behaviour is kept the same as singleflight.Do blocks until the acquisition completes. The change is in preperation for converting the code to change lease acquisition to happen asynchronously using singleflight.DoChan. This also refactors out the jitter multiplier into a constant. --- pkg/sql/lease.go | 94 +++++++++++++++++++++---------------------- pkg/sql/lease_test.go | 4 +- 2 files changed, 48 insertions(+), 50 deletions(-) diff --git a/pkg/sql/lease.go b/pkg/sql/lease.go index b64735beef6d..743ae2f7f0c7 100644 --- a/pkg/sql/lease.go +++ b/pkg/sql/lease.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) @@ -544,6 +545,7 @@ type tableState struct { mu struct { syncutil.Mutex + group singleflight.Group // table descriptors sorted by increasing version. This set always // contains a table descriptor version with a lease as the latest @@ -554,11 +556,6 @@ type tableState struct { // entry is created with the expiration time of the new lease and // the older entry is removed. active tableSet - // A channel used to indicate whether a lease is actively being - // acquired. nil if there is no lease acquisition in progress for - // the table. If non-nil, the channel will be closed when lease - // acquisition completes. - acquiring chan struct{} // Indicates that the table has been dropped, or is being dropped. // If set, leases are released from the store as soon as their // refcount drops to 0, as opposed to waiting until they expire. @@ -574,17 +571,21 @@ func (t *tableState) acquire( ) (*tableVersionState, error) { t.mu.Lock() defer t.mu.Unlock() - // Wait for any existing lease acquisition. - t.acquireWait() - // Acquire a lease if no lease exists or if the latest lease is - // about to expire. - if s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp) { - if err := t.acquireNodeLease(ctx, m, hlc.Timestamp{}); err != nil { + // Acquire a lease if no lease exists or if the latest lease is about to + // expire. Repeatedly check to ensure either a lease is acquired or we receive + // an error. Checking we have a valid lease after acquisition is is required + // if the acquiring routine immediately releases the lease. + for s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp); s = t.mu.active.findNewest() { + t.mu.Unlock() + _, _, err := t.mu.group.Do("acquire", func() (interface{}, error) { + return nil, t.acquireNodeLease(ctx, m, hlc.Timestamp{}) + }) + t.mu.Lock() + if err != nil { return nil, err } } - return t.findForTimestamp(ctx, timestamp, m) } @@ -697,20 +698,38 @@ func (t *tableState) findForTimestamp( // // t.mu must be locked. func (t *tableState) acquireFreshestFromStoreLocked(ctx context.Context, m *LeaseManager) error { - // Ensure there is no lease acquisition in progress. - t.acquireWait() - - // Move forward to acquire a fresh table lease. + // Continue to attempt to call to acquireNodeLease until our call was the only + // one made by singleflight. This busy-waits while other acquisitions are + // underway. + for { + // Move forward the expiry to acquire a fresh table lease. + + // Set the min expiration time to guarantee that the lease acquired is the + // last lease in t.mu.active . + // TODO(vivek): the expiration time is no longer needed to sort the + // tableVersionState. Get rid of this. + minExpirationTime := hlc.Timestamp{} + newestTable := t.mu.active.findNewest() + if newestTable != nil { + minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0) + } - // Set the min expiration time to guarantee that the lease acquired is the - // last lease in t.mu.active . - minExpirationTime := hlc.Timestamp{} - newestTable := t.mu.active.findNewest() - if newestTable != nil { - minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0) + t.mu.Unlock() + // hasAcquiredFreshest is used to determine if our function was the one + // called by group.Do. The shared bool returned by Do is not suffice as it + // returns true if any other calls to Do use the result, not if our function + // returns. If two routines simultaneously call Do here, they endlessly + // retry. + hasAcquiredFreshest := false + _, _, err := t.mu.group.Do("acquire", func() (interface{}, error) { + hasAcquiredFreshest = true + return nil, t.acquireNodeLease(ctx, m, minExpirationTime) + }) + t.mu.Lock() + if hasAcquiredFreshest || err != nil { + return err + } } - - return t.acquireNodeLease(ctx, m, minExpirationTime) } // upsertLocked inserts a lease for a particular table version. @@ -757,18 +776,6 @@ func (t *tableState) removeInactiveVersions(m *LeaseManager) { } } -// acquireWait waits until no lease acquisition is in progress. -func (t *tableState) acquireWait() { - // Spin until no lease acquisition is in progress. - for acquiring := t.mu.acquiring; acquiring != nil; acquiring = t.mu.acquiring { - // We're called with mu locked, but need to unlock it while we wait - // for the in-progress lease acquisition to finish. - t.mu.Unlock() - <-acquiring - t.mu.Lock() - } -} - // If the lease cannot be obtained because the descriptor is in the process of // being dropped, the error will be errTableDropped. // minExpirationTime, if not set to the zero value, will be used as a lower @@ -776,29 +783,18 @@ func (t *tableState) acquireWait() { // jitter in the expiration time, and guarantee that we get a lease that will be // inserted at the end of the lease set (i.e. it will be returned by // findNewest() from now on). -// -// t.mu needs to be locked. func (t *tableState) acquireNodeLease( ctx context.Context, m *LeaseManager, minExpirationTime hlc.Timestamp, ) error { if m.isDraining() { return errors.New("cannot acquire lease when draining") } - - // Notify when lease has been acquired. - t.mu.acquiring = make(chan struct{}) - defer func() { - close(t.mu.acquiring) - t.mu.acquiring = nil - }() - // We're called with mu locked, but need to unlock it during lease - // acquisition. - t.mu.Unlock() table, err := m.LeaseStore.acquire(ctx, t.id, minExpirationTime) - t.mu.Lock() if err != nil { return err } + t.mu.Lock() + defer t.mu.Unlock() t.upsertLocked(ctx, table, m) t.tableNameCache.insert(table) return nil diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index e5c79a1c149a..6dc6a5525ed7 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -223,8 +223,10 @@ func TestLeaseManager(testingT *testing.T) { // table and expiration. l1, e1 := t.mustAcquire(1, descID) l2, e2 := t.mustAcquire(1, descID) - if l1.ID != l2.ID || e1 != e2 { + if l1.ID != l2.ID { t.Fatalf("expected same lease, but found %v != %v", l1, l2) + } else if e1 != e2 { + t.Fatalf("expected same lease timestamps, but found %v != %v", e1, e2) } t.expectLeases(descID, "/1/1") // Node 2 never acquired a lease on descID, so we should expect an error.