Skip to content

Commit

Permalink
sql: Refactor TableDescriptor lease acquisition to use singleflight
Browse files Browse the repository at this point in the history
This changes the acquisition of leases to use singleflight to reduce the
complexity of handling concurrent acquisitions. The behaviour is kept
the same as we block on the results of singleflight.DoChan. The change
is in preparation for adding a new codepath which acquires leases
asynchronously.
  • Loading branch information
lgo committed Oct 16, 2017
1 parent 43feb10 commit f52f0b2
Show file tree
Hide file tree
Showing 3 changed files with 326 additions and 56 deletions.
164 changes: 110 additions & 54 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

Expand Down Expand Up @@ -447,6 +448,11 @@ func (s LeaseStore) getForExpiration(
return table, err
}

// leaseToken is an opaque token representing a lease. It's distinct from a
// lease to define restricted capabilities and prevent improper use of a lease
// where we instead have leaseTokens.
type leaseToken *tableVersionState

// tableSet maintains an ordered set of tableVersionState objects sorted
// by version. It supports addition and removal of elements, finding the
// table for a particular version, or finding the most recent table version.
Expand All @@ -472,6 +478,15 @@ func (l *tableSet) String() string {
return buf.String()
}

// isNewest checks if the leaseToken represents the newest lease in the
// tableSet.
func (l *tableSet) isNewest(t leaseToken) bool {
if len(l.data) == 0 {
return false
}
return leaseToken(l.data[len(l.data)-1]) == t
}

func (l *tableSet) insert(s *tableVersionState) {
i, match := l.findIndex(s.Version)
if match {
Expand Down Expand Up @@ -542,6 +557,8 @@ func (l *tableSet) findVersion(version sqlbase.DescriptorVersion) *tableVersionS
return nil
}

const acquireGroupKey = "acquire"

type tableState struct {
id sqlbase.ID
// The cache is updated every time we acquire or release a table.
Expand All @@ -551,6 +568,9 @@ type tableState struct {
mu struct {
syncutil.Mutex

// group is used for all calls made to acquireNodeLease to prevent
// concurrent lease acquisitions from the store.
group singleflight.Group
// table descriptors sorted by increasing version. This set always
// contains a table descriptor version with a lease as the latest
// entry. There may be more than one active lease when the system is
Expand All @@ -560,11 +580,6 @@ type tableState struct {
// entry is created with the expiration time of the new lease and
// the older entry is removed.
active tableSet
// A channel used to indicate whether a lease is actively being
// acquired. nil if there is no lease acquisition in progress for
// the table. If non-nil, the channel will be closed when lease
// acquisition completes.
acquiring chan struct{}
// Indicates that the table has been dropped, or is being dropped.
// If set, leases are released from the store as soon as their
// refcount drops to 0, as opposed to waiting until they expire.
Expand All @@ -580,17 +595,26 @@ func (t *tableState) acquire(
) (*tableVersionState, error) {
t.mu.Lock()
defer t.mu.Unlock()
// Wait for any existing lease acquisition.
t.acquireWait()

// Acquire a lease if no lease exists or if the latest lease is
// about to expire.
if s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp) {
if err := t.acquireNodeLease(ctx, m, hlc.Timestamp{}); err != nil {
return nil, err
// Acquire a lease if no lease exists or if the latest lease is about to
// expire. Looping is necessary because lease acquisition is done without
// holding the tableState lock, so anything can happen in between lease
// acquisition and us getting control again.
for s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp); s = t.mu.active.findNewest() {
var resultChan <-chan singleflight.Result
resultChan, _ = t.mu.group.DoChan(acquireGroupKey, func() (interface{}, error) {
return t.acquireNodeLease(ctx, m, hlc.Timestamp{})
})
t.mu.Unlock()
if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil {
m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent(LeaseAcquireBlock)
}
result := <-resultChan
t.mu.Lock()
if result.Err != nil {
return nil, result.Err
}
}

return t.findForTimestamp(ctx, timestamp, m)
}

Expand Down Expand Up @@ -703,20 +727,59 @@ func (t *tableState) findForTimestamp(
//
// t.mu must be locked.
func (t *tableState) acquireFreshestFromStoreLocked(ctx context.Context, m *LeaseManager) error {
// Ensure there is no lease acquisition in progress.
t.acquireWait()

// Move forward to acquire a fresh table lease.
// We need to acquire a lease on a "fresh" descriptor, meaning that joining
// a potential in-progress lease acquisition is generally not good enough.
// If we are to join an in-progress acquisition, it needs to be an acquisition
// initiated after this point.
// So, we handle two cases:
// 1. The first DoChan() call tells us that we didn't join an in-progress
// acquisition. Great, the lease that's being acquired is good.
// 2. The first DoChan() call tells us that we did join an in-progress acq.
// We have to wait this acquisition out; it's not good for us. But any
// future acquisition is good, so the next time around the loop it doesn't
// matter if we initiate a request or join an in-progress one.
// In both cases, we need to check if the lease we want is still valid because
// lease acquisition is done without holding the tableState lock, so anything
// can happen in between lease acquisition and us getting control again.
attemptsMade := 0
for {
// Move forward the expiry to acquire a fresh table lease.

// Set the min expiration time to guarantee that the lease acquired is the
// last lease in t.mu.active.
// TODO(vivek): the expiration time is no longer needed to sort the
// tableVersionState. Get rid of this.
minExpirationTime := hlc.Timestamp{}
s := t.mu.active.findNewest()
if s != nil {
minExpirationTime = s.expiration.Add(int64(time.Millisecond), 0)
}

// Set the min expiration time to guarantee that the lease acquired is the
// last lease in t.mu.active .
minExpirationTime := hlc.Timestamp{}
newestTable := t.mu.active.findNewest()
if newestTable != nil {
minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0)
resultChan, wasCalled := t.mu.group.DoChan(acquireGroupKey, func() (interface{}, error) {
return t.acquireNodeLease(ctx, m, minExpirationTime)
})
t.mu.Unlock()
if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil {
m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent(LeaseAcquireFreshestBlock)
}
result := <-resultChan
t.mu.Lock()
if result.Err != nil {
return result.Err
}
l := result.Val.(leaseToken)
if wasCalled && t.mu.active.isNewest(l) {
// Case 1: we didn't join an in-progress call and the lease is still
// valid.
break
} else if attemptsMade > 1 && t.mu.active.isNewest(l) {
// Case 2: more than one acquisition has happened and the lease is still
// valid.
break
}
attemptsMade++
}

return t.acquireNodeLease(ctx, m, minExpirationTime)
return nil
}

// upsertLocked inserts a lease for a particular table version.
Expand Down Expand Up @@ -763,51 +826,28 @@ func (t *tableState) removeInactiveVersions(m *LeaseManager) {
}
}

// acquireWait waits until no lease acquisition is in progress.
func (t *tableState) acquireWait() {
// Spin until no lease acquisition is in progress.
for acquiring := t.mu.acquiring; acquiring != nil; acquiring = t.mu.acquiring {
// We're called with mu locked, but need to unlock it while we wait
// for the in-progress lease acquisition to finish.
t.mu.Unlock()
<-acquiring
t.mu.Lock()
}
}

// If the lease cannot be obtained because the descriptor is in the process of
// being dropped, the error will be errTableDropped.
// minExpirationTime, if not set to the zero value, will be used as a lower
// bound on the expiration of the new table. This can be used to eliminate the
// jitter in the expiration time, and guarantee that we get a lease that will be
// inserted at the end of the lease set (i.e. it will be returned by
// findNewest() from now on).
//
// t.mu needs to be locked.
func (t *tableState) acquireNodeLease(
ctx context.Context, m *LeaseManager, minExpirationTime hlc.Timestamp,
) error {
) (leaseToken, error) {
if m.isDraining() {
return errors.New("cannot acquire lease when draining")
return nil, errors.New("cannot acquire lease when draining")
}

// Notify when lease has been acquired.
t.mu.acquiring = make(chan struct{})
defer func() {
close(t.mu.acquiring)
t.mu.acquiring = nil
}()
// We're called with mu locked, but need to unlock it during lease
// acquisition.
t.mu.Unlock()
table, err := m.LeaseStore.acquire(ctx, t.id, minExpirationTime)
t.mu.Lock()
if err != nil {
return err
return nil, err
}
t.mu.Lock()
defer t.mu.Unlock()
t.upsertLocked(ctx, table, m)
t.tableNameCache.insert(table)
return nil
return leaseToken(table), nil
}

func (t *tableState) release(table *sqlbase.TableDescriptor, m *LeaseManager) error {
Expand Down Expand Up @@ -931,13 +971,29 @@ func (t *tableState) purgeOldVersions(
return err
}

// LeaseAcquireBlockType is the type of blocking result event when
// calling LeaseAcquireResultBlockEvent.
type LeaseAcquireBlockType int

const (
// LeaseAcquireBlock denotes the LeaseAcquireResultBlockEvent is
// coming from tableState.acquire().
LeaseAcquireBlock LeaseAcquireBlockType = iota
// LeaseAcquireFreshestBlock denotes the LeaseAcquireResultBlockEvent is
// from tableState.acquireFreshestFromStoreLocked().
LeaseAcquireFreshestBlock
)

// LeaseStoreTestingKnobs contains testing knobs.
type LeaseStoreTestingKnobs struct {
// Called after a lease is removed from the store, with any operation error.
// See LeaseRemovalTracker.
LeaseReleasedEvent func(table sqlbase.TableDescriptor, err error)
// Called after a lease is acquired, with any operation error.
LeaseAcquiredEvent func(table sqlbase.TableDescriptor, err error)
// Called before waiting on a results from a DoChan call of acquireNodeLease
// in tableState.acquire() and tableState.acquireFreshestFromStoreLocked().
LeaseAcquireResultBlockEvent func(leaseBlockType LeaseAcquireBlockType)
// RemoveOnceDereferenced forces leases to be removed
// as soon as they are dereferenced.
RemoveOnceDereferenced bool
Expand Down
Loading

0 comments on commit f52f0b2

Please sign in to comment.