Skip to content

Commit

Permalink
sql: Refactor TableDescriptor lease acquisition to use singleflight
Browse files Browse the repository at this point in the history
This changes the acquisition of leases to use singleflight to reduce the
complexity of handling concurrent acquisitions. The behaviour is kept
the same as singleflight.Do blocks until the acquisition completes.
The change is in preperation for converting the code to change lease
acquisition to happen asynchronously using singleflight.DoChan.

This also refactors out the jitter multiplier into a constant.
  • Loading branch information
lgo committed Oct 4, 2017
1 parent 38d8689 commit 9a7473e
Show file tree
Hide file tree
Showing 3 changed files with 307 additions and 51 deletions.
99 changes: 50 additions & 49 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

Expand Down Expand Up @@ -544,6 +545,7 @@ type tableState struct {

mu struct {
syncutil.Mutex
group singleflight.Group

// table descriptors sorted by increasing version. This set always
// contains a table descriptor version with a lease as the latest
Expand All @@ -554,11 +556,6 @@ type tableState struct {
// entry is created with the expiration time of the new lease and
// the older entry is removed.
active tableSet
// A channel used to indicate whether a lease is actively being
// acquired. nil if there is no lease acquisition in progress for
// the table. If non-nil, the channel will be closed when lease
// acquisition completes.
acquiring chan struct{}
// Indicates that the table has been dropped, or is being dropped.
// If set, leases are released from the store as soon as their
// refcount drops to 0, as opposed to waiting until they expire.
Expand All @@ -574,17 +571,21 @@ func (t *tableState) acquire(
) (*tableVersionState, error) {
t.mu.Lock()
defer t.mu.Unlock()
// Wait for any existing lease acquisition.
t.acquireWait()

// Acquire a lease if no lease exists or if the latest lease is
// about to expire.
if s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp) {
if err := t.acquireNodeLease(ctx, m, hlc.Timestamp{}); err != nil {
// Acquire a lease if no lease exists or if the latest lease is about to
// expire. Repeatedly check to ensure either a lease is acquired or we receive
// an error. Checking we have a valid lease after acquisition is required
// if the acquiring routine immediately releases the lease.
for s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp); s = t.mu.active.findNewest() {
t.mu.Unlock()
_, _, err := t.mu.group.Do("acquire", func() (interface{}, error) {
return nil, t.acquireNodeLease(ctx, m, hlc.Timestamp{})
})
t.mu.Lock()
if err != nil {
return nil, err
}
}

return t.findForTimestamp(ctx, timestamp, m)
}

Expand Down Expand Up @@ -697,20 +698,43 @@ func (t *tableState) findForTimestamp(
//
// t.mu must be locked.
func (t *tableState) acquireFreshestFromStoreLocked(ctx context.Context, m *LeaseManager) error {
// Ensure there is no lease acquisition in progress.
t.acquireWait()

// Move forward to acquire a fresh table lease.
// Continue to attempt to call to acquireNodeLease until our call was the only
// one made by singleflight. This busy-waits while other acquisitions are
// underway.
for {
// Move forward the expiry to acquire a fresh table lease.

// Set the min expiration time to guarantee that the lease acquired is the
// last lease in t.mu.active .
// TODO(vivek): the expiration time is no longer needed to sort the
// tableVersionState. Get rid of this.
minExpirationTime := hlc.Timestamp{}
newestTable := t.mu.active.findNewest()
if newestTable != nil {
minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0)
}

// Set the min expiration time to guarantee that the lease acquired is the
// last lease in t.mu.active .
minExpirationTime := hlc.Timestamp{}
newestTable := t.mu.active.findNewest()
if newestTable != nil {
minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0)
t.mu.Unlock()
// hasAcquiredFreshest is used to determine if our function was the one
// called by group.Do. The shared bool returned by Do is not suffice as it
// returns true if any other calls to Do use the result, not if our function
// returns. If two routines simultaneously call Do here, they endlessly
// retry.
hasAcquiredFreshest := false
_, _, err := t.mu.group.Do("acquire", func() (interface{}, error) {
hasAcquiredFreshest = true
return nil, t.acquireNodeLease(ctx, m, minExpirationTime)
})
t.mu.Lock()
// Before returning we need to check if the newest lease has been released
// immediately from an other routine that raced to lock t.mu.
// We also need to check if the table was dropped before locking t.mu.
if err != nil {
return nil
} else if s := t.mu.active.findNewest(); hasAcquiredFreshest && s != nil {
return err
}
}

return t.acquireNodeLease(ctx, m, minExpirationTime)
}

// upsertLocked inserts a lease for a particular table version.
Expand Down Expand Up @@ -757,48 +781,25 @@ func (t *tableState) removeInactiveVersions(m *LeaseManager) {
}
}

// acquireWait waits until no lease acquisition is in progress.
func (t *tableState) acquireWait() {
// Spin until no lease acquisition is in progress.
for acquiring := t.mu.acquiring; acquiring != nil; acquiring = t.mu.acquiring {
// We're called with mu locked, but need to unlock it while we wait
// for the in-progress lease acquisition to finish.
t.mu.Unlock()
<-acquiring
t.mu.Lock()
}
}

// If the lease cannot be obtained because the descriptor is in the process of
// being dropped, the error will be errTableDropped.
// minExpirationTime, if not set to the zero value, will be used as a lower
// bound on the expiration of the new table. This can be used to eliminate the
// jitter in the expiration time, and guarantee that we get a lease that will be
// inserted at the end of the lease set (i.e. it will be returned by
// findNewest() from now on).
//
// t.mu needs to be locked.
func (t *tableState) acquireNodeLease(
ctx context.Context, m *LeaseManager, minExpirationTime hlc.Timestamp,
) error {
if m.isDraining() {
return errors.New("cannot acquire lease when draining")
}

// Notify when lease has been acquired.
t.mu.acquiring = make(chan struct{})
defer func() {
close(t.mu.acquiring)
t.mu.acquiring = nil
}()
// We're called with mu locked, but need to unlock it during lease
// acquisition.
t.mu.Unlock()
table, err := m.LeaseStore.acquire(ctx, t.id, minExpirationTime)
t.mu.Lock()
if err != nil {
return err
}
t.mu.Lock()
defer t.mu.Unlock()
t.upsertLocked(ctx, table, m)
t.tableNameCache.insert(table)
return nil
Expand Down
Loading

0 comments on commit 9a7473e

Please sign in to comment.