Skip to content

Commit

Permalink
sql: Refactor TableDescriptor lease acquisition to use singleflight
Browse files Browse the repository at this point in the history
This changes the acquisition of leases to use singleflight to reduce the
complexity of handling concurrent acquisitions. The behaviour is kept
the same as singleflight.Do blocks until the acquisition completes.
The change is in preperation for converting the code to change lease
acquisition to happen asynchronously using singleflight.DoChan.

This also refactors out the jitter multiplier into a constant.
  • Loading branch information
lgo committed Oct 4, 2017
1 parent 38d8689 commit e3b0875
Show file tree
Hide file tree
Showing 3 changed files with 303 additions and 51 deletions.
95 changes: 46 additions & 49 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

Expand Down Expand Up @@ -544,6 +545,7 @@ type tableState struct {

mu struct {
syncutil.Mutex
group singleflight.Group

// table descriptors sorted by increasing version. This set always
// contains a table descriptor version with a lease as the latest
Expand All @@ -554,11 +556,6 @@ type tableState struct {
// entry is created with the expiration time of the new lease and
// the older entry is removed.
active tableSet
// A channel used to indicate whether a lease is actively being
// acquired. nil if there is no lease acquisition in progress for
// the table. If non-nil, the channel will be closed when lease
// acquisition completes.
acquiring chan struct{}
// Indicates that the table has been dropped, or is being dropped.
// If set, leases are released from the store as soon as their
// refcount drops to 0, as opposed to waiting until they expire.
Expand All @@ -574,17 +571,21 @@ func (t *tableState) acquire(
) (*tableVersionState, error) {
t.mu.Lock()
defer t.mu.Unlock()
// Wait for any existing lease acquisition.
t.acquireWait()

// Acquire a lease if no lease exists or if the latest lease is
// about to expire.
if s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp) {
if err := t.acquireNodeLease(ctx, m, hlc.Timestamp{}); err != nil {
// Acquire a lease if no lease exists or if the latest lease is about to
// expire. Repeatedly check to ensure either a lease is acquired or we receive
// an error. Checking we have a valid lease after acquisition is required
// if the acquiring routine immediately releases the lease.
for s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp); s = t.mu.active.findNewest() {
t.mu.Unlock()
_, _, err := t.mu.group.Do("acquire", func() (interface{}, error) {
return nil, t.acquireNodeLease(ctx, m, hlc.Timestamp{})
})
t.mu.Lock()
if err != nil {
return nil, err
}
}

return t.findForTimestamp(ctx, timestamp, m)
}

Expand Down Expand Up @@ -697,20 +698,39 @@ func (t *tableState) findForTimestamp(
//
// t.mu must be locked.
func (t *tableState) acquireFreshestFromStoreLocked(ctx context.Context, m *LeaseManager) error {
// Ensure there is no lease acquisition in progress.
t.acquireWait()

// Move forward to acquire a fresh table lease.
// Continue to attempt to call to acquireNodeLease until our call was the only
// one made by singleflight. This busy-waits while other acquisitions are
// underway.
for {
// Move forward the expiry to acquire a fresh table lease.

// Set the min expiration time to guarantee that the lease acquired is the
// last lease in t.mu.active .
// TODO(vivek): the expiration time is no longer needed to sort the
// tableVersionState. Get rid of this.
minExpirationTime := hlc.Timestamp{}
newestTable := t.mu.active.findNewest()
if newestTable != nil {
minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0)
}

// Set the min expiration time to guarantee that the lease acquired is the
// last lease in t.mu.active .
minExpirationTime := hlc.Timestamp{}
newestTable := t.mu.active.findNewest()
if newestTable != nil {
minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0)
t.mu.Unlock()
resultChan, wasCalled := t.mu.group.DoChan("acquire", func() (interface{}, error) {
return nil, t.acquireNodeLease(ctx, m, minExpirationTime)
})
result := <-resultChan
t.mu.Lock()
// Before returning we need to check if the newest lease has been released
// immediately from an other routine that raced to lock t.mu.
// We also need to check if the table was dropped before locking t.mu.
// If this function call was not the one to be made, we can't we need to
// attempt another acquisition to ensure it gets a newer lease.
if result.Err != nil {
return result.Err
} else if s := t.mu.active.findNewest(); wasCalled && s != nil {
return result.Err
}
}

return t.acquireNodeLease(ctx, m, minExpirationTime)
}

// upsertLocked inserts a lease for a particular table version.
Expand Down Expand Up @@ -757,48 +777,25 @@ func (t *tableState) removeInactiveVersions(m *LeaseManager) {
}
}

// acquireWait waits until no lease acquisition is in progress.
func (t *tableState) acquireWait() {
// Spin until no lease acquisition is in progress.
for acquiring := t.mu.acquiring; acquiring != nil; acquiring = t.mu.acquiring {
// We're called with mu locked, but need to unlock it while we wait
// for the in-progress lease acquisition to finish.
t.mu.Unlock()
<-acquiring
t.mu.Lock()
}
}

// If the lease cannot be obtained because the descriptor is in the process of
// being dropped, the error will be errTableDropped.
// minExpirationTime, if not set to the zero value, will be used as a lower
// bound on the expiration of the new table. This can be used to eliminate the
// jitter in the expiration time, and guarantee that we get a lease that will be
// inserted at the end of the lease set (i.e. it will be returned by
// findNewest() from now on).
//
// t.mu needs to be locked.
func (t *tableState) acquireNodeLease(
ctx context.Context, m *LeaseManager, minExpirationTime hlc.Timestamp,
) error {
if m.isDraining() {
return errors.New("cannot acquire lease when draining")
}

// Notify when lease has been acquired.
t.mu.acquiring = make(chan struct{})
defer func() {
close(t.mu.acquiring)
t.mu.acquiring = nil
}()
// We're called with mu locked, but need to unlock it during lease
// acquisition.
t.mu.Unlock()
table, err := m.LeaseStore.acquire(ctx, t.id, minExpirationTime)
t.mu.Lock()
if err != nil {
return err
}
t.mu.Lock()
defer t.mu.Unlock()
t.upsertLocked(ctx, table, m)
t.tableNameCache.insert(table)
return nil
Expand Down
Loading

0 comments on commit e3b0875

Please sign in to comment.