From 54a3bbcb5737e08c4ca7949355b40a005b0ecda0 Mon Sep 17 00:00:00 2001 From: Joey Pereira Date: Fri, 29 Sep 2017 14:20:37 -0400 Subject: [PATCH] sql: Refactor TableDescriptor lease acquisition to use singleflight This changes the acquisition of leases to use singleflight to reduce the complexity of handling concurrent acquisitions. The behaviour is kept the same as singleflight.Do blocks until the acquisition completes. The change is in preperation for converting the code to change lease acquisition to happen asynchronously using singleflight.DoChan. This also refactors out the jitter multiplier into a constant. --- pkg/sql/lease.go | 100 ++++++++++++++++++++---------------------- pkg/sql/lease_test.go | 4 +- 2 files changed, 51 insertions(+), 53 deletions(-) diff --git a/pkg/sql/lease.go b/pkg/sql/lease.go index b64735beef6d..216af9f69b7e 100644 --- a/pkg/sql/lease.go +++ b/pkg/sql/lease.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) @@ -46,9 +47,16 @@ import ( var ( // LeaseDuration is the mean duration a lease will be acquired for. The - // actual duration is jittered in the range - // [0.75,1.25]*LeaseDuration. Exported for testing purposes only. + // actual duration is jittered using LeaseJitterFraction. Jittering is done to + // prevent multiple leases from being renewed simultaneously if they were all + // first acquired simultaneously. Exported for testing purposes only. LeaseDuration = 5 * time.Minute + // LeaseJitterFraction is the factor that we use to randomly jitter the lease + // duration when acquiring a new lease and the lease renewal timeout. The + // range of the actual lease duration will be + // [(1-LeaseJitterFraction) * LeaseDuration, (1+LeaseJitterFraction) * LeaseDuration] + // Exported for testing purposes only. + LeaseJitterFraction = 0.25 ) // tableVersionState holds the state for a table version. This includes @@ -123,9 +131,10 @@ type LeaseStore struct { } // jitteredLeaseDuration returns a randomly jittered duration from the interval -// [0.75 * leaseDuration, 1.25 * leaseDuration]. +// [(1-LeaseJitterFraction) * leaseDuration, (1+LeaseJitterFraction) * leaseDuration]. func jitteredLeaseDuration() time.Duration { - return time.Duration(float64(LeaseDuration) * (0.75 + 0.5*rand.Float64())) + return time.Duration(float64(LeaseDuration) * (1 - LeaseJitterFraction + + 2*LeaseJitterFraction*rand.Float64())) } // acquire a lease on the most recent version of a table descriptor. @@ -544,6 +553,7 @@ type tableState struct { mu struct { syncutil.Mutex + group singleflight.Group // table descriptors sorted by increasing version. This set always // contains a table descriptor version with a lease as the latest @@ -554,11 +564,6 @@ type tableState struct { // entry is created with the expiration time of the new lease and // the older entry is removed. active tableSet - // A channel used to indicate whether a lease is actively being - // acquired. nil if there is no lease acquisition in progress for - // the table. If non-nil, the channel will be closed when lease - // acquisition completes. - acquiring chan struct{} // Indicates that the table has been dropped, or is being dropped. // If set, leases are released from the store as soon as their // refcount drops to 0, as opposed to waiting until they expire. @@ -574,17 +579,21 @@ func (t *tableState) acquire( ) (*tableVersionState, error) { t.mu.Lock() defer t.mu.Unlock() - // Wait for any existing lease acquisition. - t.acquireWait() - // Acquire a lease if no lease exists or if the latest lease is - // about to expire. - if s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp) { - if err := t.acquireNodeLease(ctx, m, hlc.Timestamp{}); err != nil { + // Acquire a lease if no lease exists or if the latest lease is about to + // expire. Repeatedly check to ensure either a lease is acquired or we receive + // an error. Checking we havea valid lease after acquisition is is required + // if the acquiring routine immediately releases the lease. + for s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp); s = t.mu.active.findNewest() { + t.mu.Unlock() + _, _, err := t.mu.group.Do("acquire", func() (interface{}, error) { + return nil, t.acquireNodeLease(ctx, m, hlc.Timestamp{}) + }) + t.mu.Lock() + if err != nil { return nil, err } } - return t.findForTimestamp(ctx, timestamp, m) } @@ -697,20 +706,30 @@ func (t *tableState) findForTimestamp( // // t.mu must be locked. func (t *tableState) acquireFreshestFromStoreLocked(ctx context.Context, m *LeaseManager) error { - // Ensure there is no lease acquisition in progress. - t.acquireWait() - - // Move forward to acquire a fresh table lease. + // Continue to attempt to call to acquireNodeLease until our call was the one + // made by singleflight. This busy-waits while another acquisitions are + // underway. + var err error + hasAcquiredFreshest := false + for !hasAcquiredFreshest { + // Move forward the expiry to acquire a fresh table lease. + + // Set the min expiration time to guarantee that the lease acquired is the + // last lease in t.mu.active . + minExpirationTime := hlc.Timestamp{} + newestTable := t.mu.active.findNewest() + if newestTable != nil { + minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0) + } - // Set the min expiration time to guarantee that the lease acquired is the - // last lease in t.mu.active . - minExpirationTime := hlc.Timestamp{} - newestTable := t.mu.active.findNewest() - if newestTable != nil { - minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0) + t.mu.Unlock() + _, _, err = t.mu.group.Do("acquire", func() (interface{}, error) { + hasAcquiredFreshest = true + return nil, t.acquireNodeLease(ctx, m, minExpirationTime) + }) + t.mu.Lock() } - - return t.acquireNodeLease(ctx, m, minExpirationTime) + return err } // upsertLocked inserts a lease for a particular table version. @@ -757,18 +776,6 @@ func (t *tableState) removeInactiveVersions(m *LeaseManager) { } } -// acquireWait waits until no lease acquisition is in progress. -func (t *tableState) acquireWait() { - // Spin until no lease acquisition is in progress. - for acquiring := t.mu.acquiring; acquiring != nil; acquiring = t.mu.acquiring { - // We're called with mu locked, but need to unlock it while we wait - // for the in-progress lease acquisition to finish. - t.mu.Unlock() - <-acquiring - t.mu.Lock() - } -} - // If the lease cannot be obtained because the descriptor is in the process of // being dropped, the error will be errTableDropped. // minExpirationTime, if not set to the zero value, will be used as a lower @@ -776,26 +783,15 @@ func (t *tableState) acquireWait() { // jitter in the expiration time, and guarantee that we get a lease that will be // inserted at the end of the lease set (i.e. it will be returned by // findNewest() from now on). -// -// t.mu needs to be locked. func (t *tableState) acquireNodeLease( ctx context.Context, m *LeaseManager, minExpirationTime hlc.Timestamp, ) error { if m.isDraining() { return errors.New("cannot acquire lease when draining") } - - // Notify when lease has been acquired. - t.mu.acquiring = make(chan struct{}) - defer func() { - close(t.mu.acquiring) - t.mu.acquiring = nil - }() - // We're called with mu locked, but need to unlock it during lease - // acquisition. - t.mu.Unlock() table, err := m.LeaseStore.acquire(ctx, t.id, minExpirationTime) t.mu.Lock() + defer t.mu.Unlock() if err != nil { return err } diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index e5c79a1c149a..6dc6a5525ed7 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -223,8 +223,10 @@ func TestLeaseManager(testingT *testing.T) { // table and expiration. l1, e1 := t.mustAcquire(1, descID) l2, e2 := t.mustAcquire(1, descID) - if l1.ID != l2.ID || e1 != e2 { + if l1.ID != l2.ID { t.Fatalf("expected same lease, but found %v != %v", l1, l2) + } else if e1 != e2 { + t.Fatalf("expected same lease timestamps, but found %v != %v", e1, e2) } t.expectLeases(descID, "/1/1") // Node 2 never acquired a lease on descID, so we should expect an error.