diff --git a/pkg/sql/lease.go b/pkg/sql/lease.go index b64735beef6d..743ae2f7f0c7 100644 --- a/pkg/sql/lease.go +++ b/pkg/sql/lease.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) @@ -544,6 +545,7 @@ type tableState struct { mu struct { syncutil.Mutex + group singleflight.Group // table descriptors sorted by increasing version. This set always // contains a table descriptor version with a lease as the latest @@ -554,11 +556,6 @@ type tableState struct { // entry is created with the expiration time of the new lease and // the older entry is removed. active tableSet - // A channel used to indicate whether a lease is actively being - // acquired. nil if there is no lease acquisition in progress for - // the table. If non-nil, the channel will be closed when lease - // acquisition completes. - acquiring chan struct{} // Indicates that the table has been dropped, or is being dropped. // If set, leases are released from the store as soon as their // refcount drops to 0, as opposed to waiting until they expire. @@ -574,17 +571,21 @@ func (t *tableState) acquire( ) (*tableVersionState, error) { t.mu.Lock() defer t.mu.Unlock() - // Wait for any existing lease acquisition. - t.acquireWait() - // Acquire a lease if no lease exists or if the latest lease is - // about to expire. - if s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp) { - if err := t.acquireNodeLease(ctx, m, hlc.Timestamp{}); err != nil { + // Acquire a lease if no lease exists or if the latest lease is about to + // expire. Repeatedly check to ensure either a lease is acquired or we receive + // an error. Checking we have a valid lease after acquisition is is required + // if the acquiring routine immediately releases the lease. + for s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp); s = t.mu.active.findNewest() { + t.mu.Unlock() + _, _, err := t.mu.group.Do("acquire", func() (interface{}, error) { + return nil, t.acquireNodeLease(ctx, m, hlc.Timestamp{}) + }) + t.mu.Lock() + if err != nil { return nil, err } } - return t.findForTimestamp(ctx, timestamp, m) } @@ -697,20 +698,38 @@ func (t *tableState) findForTimestamp( // // t.mu must be locked. func (t *tableState) acquireFreshestFromStoreLocked(ctx context.Context, m *LeaseManager) error { - // Ensure there is no lease acquisition in progress. - t.acquireWait() - - // Move forward to acquire a fresh table lease. + // Continue to attempt to call to acquireNodeLease until our call was the only + // one made by singleflight. This busy-waits while other acquisitions are + // underway. + for { + // Move forward the expiry to acquire a fresh table lease. + + // Set the min expiration time to guarantee that the lease acquired is the + // last lease in t.mu.active . + // TODO(vivek): the expiration time is no longer needed to sort the + // tableVersionState. Get rid of this. + minExpirationTime := hlc.Timestamp{} + newestTable := t.mu.active.findNewest() + if newestTable != nil { + minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0) + } - // Set the min expiration time to guarantee that the lease acquired is the - // last lease in t.mu.active . - minExpirationTime := hlc.Timestamp{} - newestTable := t.mu.active.findNewest() - if newestTable != nil { - minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0) + t.mu.Unlock() + // hasAcquiredFreshest is used to determine if our function was the one + // called by group.Do. The shared bool returned by Do is not suffice as it + // returns true if any other calls to Do use the result, not if our function + // returns. If two routines simultaneously call Do here, they endlessly + // retry. + hasAcquiredFreshest := false + _, _, err := t.mu.group.Do("acquire", func() (interface{}, error) { + hasAcquiredFreshest = true + return nil, t.acquireNodeLease(ctx, m, minExpirationTime) + }) + t.mu.Lock() + if hasAcquiredFreshest || err != nil { + return err + } } - - return t.acquireNodeLease(ctx, m, minExpirationTime) } // upsertLocked inserts a lease for a particular table version. @@ -757,18 +776,6 @@ func (t *tableState) removeInactiveVersions(m *LeaseManager) { } } -// acquireWait waits until no lease acquisition is in progress. -func (t *tableState) acquireWait() { - // Spin until no lease acquisition is in progress. - for acquiring := t.mu.acquiring; acquiring != nil; acquiring = t.mu.acquiring { - // We're called with mu locked, but need to unlock it while we wait - // for the in-progress lease acquisition to finish. - t.mu.Unlock() - <-acquiring - t.mu.Lock() - } -} - // If the lease cannot be obtained because the descriptor is in the process of // being dropped, the error will be errTableDropped. // minExpirationTime, if not set to the zero value, will be used as a lower @@ -776,29 +783,18 @@ func (t *tableState) acquireWait() { // jitter in the expiration time, and guarantee that we get a lease that will be // inserted at the end of the lease set (i.e. it will be returned by // findNewest() from now on). -// -// t.mu needs to be locked. func (t *tableState) acquireNodeLease( ctx context.Context, m *LeaseManager, minExpirationTime hlc.Timestamp, ) error { if m.isDraining() { return errors.New("cannot acquire lease when draining") } - - // Notify when lease has been acquired. - t.mu.acquiring = make(chan struct{}) - defer func() { - close(t.mu.acquiring) - t.mu.acquiring = nil - }() - // We're called with mu locked, but need to unlock it during lease - // acquisition. - t.mu.Unlock() table, err := m.LeaseStore.acquire(ctx, t.id, minExpirationTime) - t.mu.Lock() if err != nil { return err } + t.mu.Lock() + defer t.mu.Unlock() t.upsertLocked(ctx, table, m) t.tableNameCache.insert(table) return nil diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index e5c79a1c149a..6dc6a5525ed7 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -223,8 +223,10 @@ func TestLeaseManager(testingT *testing.T) { // table and expiration. l1, e1 := t.mustAcquire(1, descID) l2, e2 := t.mustAcquire(1, descID) - if l1.ID != l2.ID || e1 != e2 { + if l1.ID != l2.ID { t.Fatalf("expected same lease, but found %v != %v", l1, l2) + } else if e1 != e2 { + t.Fatalf("expected same lease timestamps, but found %v != %v", e1, e2) } t.expectLeases(descID, "/1/1") // Node 2 never acquired a lease on descID, so we should expect an error.