Skip to content

Commit

Permalink
sql: Refactor TableDescriptor lease acquisition to use singleflight
Browse files Browse the repository at this point in the history
This changes the acquisition of leases to use singleflight to reduce the
complexity of handling concurrent acquisitions. The behaviour is kept
the same as we block on the results of singleflight.DoChan. The change
is in preparation for adding a new codepath which acquires leases
asynchronously.

This also refactors out the jitter multiplier into a constant.
  • Loading branch information
lgo committed Oct 16, 2017
1 parent 2331ca1 commit f620218
Show file tree
Hide file tree
Showing 3 changed files with 351 additions and 71 deletions.
202 changes: 133 additions & 69 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

Expand All @@ -46,9 +47,16 @@ import (

const (
// LeaseDuration is the mean duration a lease will be acquired for. The
// actual duration is jittered in the range
// [0.75,1.25]*LeaseDuration. Exported for testing purposes only.
// actual duration is jittered using LeaseJitterFraction. Jittering is done to
// prevent multiple leases from being renewed simultaneously if they were all
// acquired simultaneously. Exported for testing purposes only.
LeaseDuration = 5 * time.Minute
// LeaseJitterFraction is the factor that we use to randomly jitter the lease
// duration when acquiring a new lease and the lease renewal timeout. The
// range of the actual lease duration will be
// [(1-LeaseJitterFraction) * LeaseDuration, (1+LeaseJitterFraction) * LeaseDuration]
// Exported for testing purposes only.
LeaseJitterFraction = 0.25
)

// tableVersionState holds the state for a table version. This includes
Expand Down Expand Up @@ -118,18 +126,21 @@ type LeaseStore struct {
clock *hlc.Clock
nodeID *base.NodeIDContainer

// leaseDuration is a constant initialized by NewLeaseManager. It is modified
// only during by tests through testSetLeaseDuration.
leaseDuration time.Duration
// leaseDuration and leasejitterFraction are a constants initialized by
// NewLeaseManager. They are modified only during by tests through
// testSetLeaseDuration and testSetLeaseJitterFraction.
leaseDuration time.Duration
leaseJitterFraction float64

testingKnobs LeaseStoreTestingKnobs
memMetrics *MemoryMetrics
}

// jitteredLeaseDuration returns a randomly jittered duration from the interval
// [0.75 * leaseDuration, 1.25 * leaseDuration].
func jitteredLeaseDuration(leaseDuration time.Duration) time.Duration {
return time.Duration(float64(leaseDuration) * (0.75 + 0.5*rand.Float64()))
// [(1-LeaseJitterFraction) * leaseDuration, (1+LeaseJitterFraction) * leaseDuration].
func jitteredLeaseDuration(leaseDuration time.Duration, leaseJitterFraction float64) time.Duration {
return time.Duration(float64(leaseDuration) * (1 - leaseJitterFraction +
2*leaseJitterFraction*rand.Float64()))
}

// acquire a lease on the most recent version of a table descriptor.
Expand All @@ -141,7 +152,7 @@ func (s LeaseStore) acquire(
var table *tableVersionState
err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
expiration := txn.OrigTimestamp()
expiration.WallTime += int64(jitteredLeaseDuration(s.leaseDuration))
expiration.WallTime += int64(jitteredLeaseDuration(s.leaseDuration, s.leaseJitterFraction))
if expiration.Less(minExpirationTime) {
expiration = minExpirationTime
}
Expand Down Expand Up @@ -445,6 +456,11 @@ func (s LeaseStore) getForExpiration(
return table, err
}

// leaseToken is an opaque token representing a lease. It's distinct from a
// lease to define restricted capabilities and prevent improper use of a lease
// where we instead have leaseTokens.
type leaseToken *tableVersionState

// tableSet maintains an ordered set of tableVersionState objects sorted
// by version. It supports addition and removal of elements, finding the
// table for a particular version, or finding the most recent table version.
Expand All @@ -470,6 +486,15 @@ func (l *tableSet) String() string {
return buf.String()
}

// isNewest checks if the leaseToken represents the newest lease in the
// tableSet.
func (l *tableSet) isNewest(t leaseToken) bool {
if len(l.data) == 0 {
return false
}
return leaseToken(l.data[len(l.data)-1]) == t
}

func (l *tableSet) insert(s *tableVersionState) {
i, match := l.findIndex(s.Version)
if match {
Expand Down Expand Up @@ -540,6 +565,8 @@ func (l *tableSet) findVersion(version sqlbase.DescriptorVersion) *tableVersionS
return nil
}

const acquireGroupKey = "acquire"

type tableState struct {
id sqlbase.ID
// The cache is updated every time we acquire or release a table.
Expand All @@ -549,6 +576,9 @@ type tableState struct {
mu struct {
syncutil.Mutex

// group is used for all calls made to acquireNodeLease to prevent
// concurrent lease acquisitions from the store.
group singleflight.Group
// table descriptors sorted by increasing version. This set always
// contains a table descriptor version with a lease as the latest
// entry. There may be more than one active lease when the system is
Expand All @@ -558,11 +588,6 @@ type tableState struct {
// entry is created with the expiration time of the new lease and
// the older entry is removed.
active tableSet
// A channel used to indicate whether a lease is actively being
// acquired. nil if there is no lease acquisition in progress for
// the table. If non-nil, the channel will be closed when lease
// acquisition completes.
acquiring chan struct{}
// Indicates that the table has been dropped, or is being dropped.
// If set, leases are released from the store as soon as their
// refcount drops to 0, as opposed to waiting until they expire.
Expand All @@ -578,17 +603,26 @@ func (t *tableState) acquire(
) (*tableVersionState, error) {
t.mu.Lock()
defer t.mu.Unlock()
// Wait for any existing lease acquisition.
t.acquireWait()

// Acquire a lease if no lease exists or if the latest lease is
// about to expire.
if s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp) {
if err := t.acquireNodeLease(ctx, m, hlc.Timestamp{}); err != nil {
return nil, err
// Acquire a lease if no lease exists or if the latest lease is about to
// expire. Looping is necessary because lease acquisition is done without
// holding the tableState lock, so anything can happen in between lease
// acquisition and us getting control again.
for s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp); s = t.mu.active.findNewest() {
var resultChan <-chan singleflight.Result
resultChan, _ = t.mu.group.DoChan(acquireGroupKey, func() (interface{}, error) {
return t.acquireNodeLease(ctx, m, hlc.Timestamp{})
})
t.mu.Unlock()
if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil {
m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent()
}
result := <-resultChan
t.mu.Lock()
if result.Err != nil {
return nil, result.Err
}
}

return t.findForTimestamp(ctx, timestamp, m)
}

Expand Down Expand Up @@ -701,20 +735,59 @@ func (t *tableState) findForTimestamp(
//
// t.mu must be locked.
func (t *tableState) acquireFreshestFromStoreLocked(ctx context.Context, m *LeaseManager) error {
// Ensure there is no lease acquisition in progress.
t.acquireWait()

// Move forward to acquire a fresh table lease.
// We need to acquire a lease on a "fresh" descriptor, meaning that joining
// a potential in-progress lease acquisition is generally not good enough.
// If we are to join an in-progress acquisition, it needs to be an acquisition
// initiated after this point.
// So, we handle two cases:
// 1. The first DoChan() call tells us that we didn't join an in-progress
// acquisition. Great, the lease that's being acquired is good.
// 2. The first DoChan() call tells us that we did join an in-progress acq.
// We have to wait this acquisition out; it's not good for us. But any
// future acquisition is good, so the next time around the loop it doesn't
// matter if we initiate a request or join an in-progress one.
// In both cases, we need to check if the lease we want is still valid because
// lease acquisition is done without holding the tableState lock, so anything
// can happen in between lease acquisition and us getting control again.
attemptsMade := 0
for {
// Move forward the expiry to acquire a fresh table lease.

// Set the min expiration time to guarantee that the lease acquired is the
// last lease in t.mu.active.
// TODO(vivek): the expiration time is no longer needed to sort the
// tableVersionState. Get rid of this.
minExpirationTime := hlc.Timestamp{}
s := t.mu.active.findNewest()
if s != nil {
minExpirationTime = s.expiration.Add(int64(time.Millisecond), 0)
}

// Set the min expiration time to guarantee that the lease acquired is the
// last lease in t.mu.active .
minExpirationTime := hlc.Timestamp{}
newestTable := t.mu.active.findNewest()
if newestTable != nil {
minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0)
resultChan, wasCalled := t.mu.group.DoChan(acquireGroupKey, func() (interface{}, error) {
return t.acquireNodeLease(ctx, m, minExpirationTime)
})
t.mu.Unlock()
if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireFreshestResultBlockEvent != nil {
m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireFreshestResultBlockEvent()
}
result := <-resultChan
t.mu.Lock()
if result.Err != nil {
return result.Err
}
l := result.Val.(leaseToken)
if wasCalled && t.mu.active.isNewest(l) {
// Case 1: we didn't join an in-progress call and the lease is still
// valid.
break
} else if attemptsMade > 1 && t.mu.active.isNewest(l) {
// Case 2: more than one acquisition has happened and the lease is still
// valid.
break
}
attemptsMade++
}

return t.acquireNodeLease(ctx, m, minExpirationTime)
return nil
}

// upsertLocked inserts a lease for a particular table version.
Expand Down Expand Up @@ -761,51 +834,28 @@ func (t *tableState) removeInactiveVersions(m *LeaseManager) {
}
}

// acquireWait waits until no lease acquisition is in progress.
func (t *tableState) acquireWait() {
// Spin until no lease acquisition is in progress.
for acquiring := t.mu.acquiring; acquiring != nil; acquiring = t.mu.acquiring {
// We're called with mu locked, but need to unlock it while we wait
// for the in-progress lease acquisition to finish.
t.mu.Unlock()
<-acquiring
t.mu.Lock()
}
}

// If the lease cannot be obtained because the descriptor is in the process of
// being dropped, the error will be errTableDropped.
// minExpirationTime, if not set to the zero value, will be used as a lower
// bound on the expiration of the new table. This can be used to eliminate the
// jitter in the expiration time, and guarantee that we get a lease that will be
// inserted at the end of the lease set (i.e. it will be returned by
// findNewest() from now on).
//
// t.mu needs to be locked.
func (t *tableState) acquireNodeLease(
ctx context.Context, m *LeaseManager, minExpirationTime hlc.Timestamp,
) error {
) (leaseToken, error) {
if m.isDraining() {
return errors.New("cannot acquire lease when draining")
return nil, errors.New("cannot acquire lease when draining")
}

// Notify when lease has been acquired.
t.mu.acquiring = make(chan struct{})
defer func() {
close(t.mu.acquiring)
t.mu.acquiring = nil
}()
// We're called with mu locked, but need to unlock it during lease
// acquisition.
t.mu.Unlock()
table, err := m.LeaseStore.acquire(ctx, t.id, minExpirationTime)
t.mu.Lock()
if err != nil {
return err
return nil, err
}
t.mu.Lock()
defer t.mu.Unlock()
t.upsertLocked(ctx, table, m)
t.tableNameCache.insert(table)
return nil
return leaseToken(table), nil
}

func (t *tableState) release(table *sqlbase.TableDescriptor, m *LeaseManager) error {
Expand Down Expand Up @@ -936,6 +986,12 @@ type LeaseStoreTestingKnobs struct {
LeaseReleasedEvent func(table sqlbase.TableDescriptor, err error)
// Called after a lease is acquired, with any operation error.
LeaseAcquiredEvent func(table sqlbase.TableDescriptor, err error)
// Called before waiting on a results from a DoChan call of acquireNodeLease
// in tableState.acquire(), with any operation error.
LeaseAcquireResultBlockEvent func()
// Called before waiting on a results from a DoChan call of acquireNodeLease
// in tableState.acquireFreshestFromStoreLocked(), with any operation error.
LeaseAcquireFreshestResultBlockEvent func()
// RemoveOnceDereferenced forces leases to be removed
// as soon as they are dereferenced.
RemoveOnceDereferenced bool
Expand Down Expand Up @@ -1103,12 +1159,13 @@ func NewLeaseManager(
) *LeaseManager {
lm := &LeaseManager{
LeaseStore: LeaseStore{
db: db,
clock: clock,
nodeID: nodeID,
leaseDuration: LeaseDuration,
testingKnobs: testingKnobs.LeaseStoreTestingKnobs,
memMetrics: memMetrics,
db: db,
clock: clock,
nodeID: nodeID,
leaseDuration: LeaseDuration,
leaseJitterFraction: LeaseJitterFraction,
testingKnobs: testingKnobs.LeaseStoreTestingKnobs,
memMetrics: memMetrics,
},
testingKnobs: testingKnobs,
tableNames: tableNameCache{
Expand Down Expand Up @@ -1136,6 +1193,13 @@ func (m *LeaseManager) TestSetLeaseDuration(leaseDuration time.Duration) {
m.LeaseStore.leaseDuration = leaseDuration
}

// TestSetLeaseJitterFraction is used to modify the leaseJitterFraction constant
// for the lifespan of the LeaseManager. This function exists and is exported only
// for testing purposes.
func (m *LeaseManager) TestSetLeaseJitterFraction(leaseJitterFraction float64) {
m.LeaseStore.leaseJitterFraction = leaseJitterFraction
}

// AcquireByName returns a table version for the specified table valid for
// the timestamp. It returns the table descriptor and a expiration time.
// A transaction using this descriptor must ensure that its
Expand Down
Loading

0 comments on commit f620218

Please sign in to comment.