Skip to content

Commit

Permalink
sql: changed LeaseManager API to return/use table descriptors
Browse files Browse the repository at this point in the history
The Acquire/AcquireByName and Release() methods no longer
return/use a LeaseState and instead return/use a
sqlbase.TableDescriptor with an expiration time.

the LeaseManager is moving towards a model where it returns
a table descriptor with an expiration time for the use of the
table descriptor. For now this table descriptor is associated
with a lease, but in the future it might be an older version of
the table descriptor, in which case it will not have a lease.

With this change, leases on the same table descriptor version
are renewed under the covers without needing the API to issue
an explicit release on a table descriptor version. The refcount
on the old lease is simply transferred to the new one. The new
lease is guaranteed to have a higher expiration time and thus a
prior returned expiration time returned by the API is guaranteed
to be contained within the new lease expiration time.

A Release() on the table descriptor will reduce the refcount on the
current lease. In the future, this will reduce the refcount on a table
version which might not be associated with a lease.

Removed the need to use removeLeaseIfExpiring since once a transaction
has picked a timestamp and a valid table descriptor for the timestamp
the table descriptor need not be reacquired through the API.

related to cockroachdb#2948
  • Loading branch information
vivekmenezes committed Jun 14, 2017
1 parent e968655 commit a95331a
Show file tree
Hide file tree
Showing 22 changed files with 362 additions and 398 deletions.
1 change: 0 additions & 1 deletion pkg/internal/client/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,6 @@ func TestCommonMethods(t *testing.T) {
{txnType, "IsFinalized"}: {},
{txnType, "NewBatch"}: {},
{txnType, "Exec"}: {},
{txnType, "GetDeadline"}: {},
{txnType, "ResetDeadline"}: {},
{txnType, "Run"}: {},
{txnType, "SetDebugName"}: {},
Expand Down
5 changes: 0 additions & 5 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,11 +523,6 @@ func (txn *Txn) ResetDeadline() {
txn.deadline = nil
}

// GetDeadline returns the deadline. For testing.
func (txn *Txn) GetDeadline() *hlc.Timestamp {
return txn.deadline
}

// Rollback sends an EndTransactionRequest with Commit=false.
// txn is considered finalized and cannot be used to send any more commands.
func (txn *Txn) Rollback(ctx context.Context) error {
Expand Down
20 changes: 10 additions & 10 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,10 @@ func (sc *SchemaChanger) maybeWriteResumeSpan(
return nil
}

func (sc *SchemaChanger) getTableLease(
ctx context.Context, txn *client.Txn, lc *LeaseCollection, version sqlbase.DescriptorVersion,
func (sc *SchemaChanger) getTableVersion(
ctx context.Context, txn *client.Txn, tc *TableCollection, version sqlbase.DescriptorVersion,
) (*sqlbase.TableDescriptor, error) {
tableDesc, err := lc.getTableLeaseByID(ctx, txn, sc.tableID)
tableDesc, err := tc.getTableVersionByID(ctx, txn, sc.tableID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -285,9 +285,9 @@ func (sc *SchemaChanger) truncateIndexes(
return err
}

lc := &LeaseCollection{leaseMgr: sc.leaseMgr}
defer lc.releaseLeases(ctx)
tableDesc, err := sc.getTableLease(ctx, txn, lc, version)
tc := &TableCollection{leaseMgr: sc.leaseMgr}
defer tc.releaseTables(ctx)
tableDesc, err := sc.getTableVersion(ctx, txn, tc, version)
if err != nil {
return err
}
Expand Down Expand Up @@ -402,10 +402,10 @@ func (sc *SchemaChanger) distBackfill(
}
log.VEventf(ctx, 2, "backfill: process %+v spans", spans)
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
lc := &LeaseCollection{leaseMgr: sc.leaseMgr}
tc := &TableCollection{leaseMgr: sc.leaseMgr}
// Use a leased table descriptor for the backfill.
defer lc.releaseLeases(ctx)
tableDesc, err := sc.getTableLease(ctx, txn, lc, version)
defer tc.releaseTables(ctx)
tableDesc, err := sc.getTableVersion(ctx, txn, tc, version)
if err != nil {
return err
}
Expand All @@ -415,7 +415,7 @@ func (sc *SchemaChanger) distBackfill(
if backfillType == columnBackfill {
fkTables := sqlbase.TablesNeededForFKs(*tableDesc, sqlbase.CheckUpdates)
for k := range fkTables {
table, err := lc.getTableLeaseByID(ctx, txn, k)
table, err := tc.getTableVersionByID(ctx, txn, k)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func (p *planner) getTableScanByRef(
scanVisibility scanVisibility,
) (planDataSource, error) {
tableID := sqlbase.ID(tref.TableID)
descFunc := p.session.leases.getTableLeaseByID
descFunc := p.session.tables.getTableVersionByID
if p.avoidCachedDescriptors {
descFunc = sqlbase.GetTableDescFromID
}
Expand Down Expand Up @@ -502,7 +502,7 @@ func (p *planner) getTableScanOrViewPlan(
desc, err = mustGetTableOrViewDesc(
ctx, p.txn, p.getVirtualTabler(), tn, false /*allowAdding*/)
} else {
desc, err = p.session.leases.getTableLease(ctx, p.txn, p.getVirtualTabler(), tn)
desc, err = p.session.tables.getTableVersion(ctx, p.txn, p.getVirtualTabler(), tn)
}
if err != nil {
return planDataSource{}, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ func (e *Executor) execParsed(
// Exec the schema changers (if the txn rolled back, the schema changers
// will short-circuit because the corresponding descriptor mutation is not
// found).
session.leases.releaseLeases(session.Ctx())
session.tables.releaseTables(session.Ctx())
txnState.schemaChangers.execSchemaChanges(session.Ctx(), e, session, res.ResultList)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestPrepareCanAcquireLeases(t *testing.T) {

// Acquire a lease and assert that the store did in fact create a
// new lease.
_, err := s.LeaseManager().(*LeaseManager).Acquire(
_, _, err := s.LeaseManager().(*LeaseManager).Acquire(
ctx, planner.txn, sqlbase.ID(dummyTableID), 0 /* version */)
if err != nil {
return nil, err
Expand Down
23 changes: 16 additions & 7 deletions pkg/sql/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)
Expand All @@ -38,7 +39,7 @@ type LeaseRemovalTracker struct {
mu syncutil.Mutex
// map from a lease whose release we're waiting for to a tracker for that
// lease.
tracking map[*LeaseState]RemovalTracker
tracking map[tableVersionID]RemovalTracker
}

type RemovalTracker struct {
Expand All @@ -50,21 +51,25 @@ type RemovalTracker struct {
// NewLeaseRemovalTracker creates a LeaseRemovalTracker.
func NewLeaseRemovalTracker() *LeaseRemovalTracker {
return &LeaseRemovalTracker{
tracking: make(map[*LeaseState]RemovalTracker),
tracking: make(map[tableVersionID]RemovalTracker),
}
}

// TrackRemoval starts monitoring lease removals for a particular lease.
// This should be called before triggering the operation that (asynchronously)
// removes the lease.
func (w *LeaseRemovalTracker) TrackRemoval(lease *LeaseState) RemovalTracker {
func (w *LeaseRemovalTracker) TrackRemoval(table sqlbase.TableDescriptor) RemovalTracker {
id := tableVersionID{
id: table.ID,
version: table.Version,
}
w.mu.Lock()
defer w.mu.Unlock()
if tracker, ok := w.tracking[lease]; ok {
if tracker, ok := w.tracking[id]; ok {
return tracker
}
tracker := RemovalTracker{removed: make(chan struct{}), err: new(error)}
w.tracking[lease] = tracker
w.tracking[id] = tracker
return tracker
}

Expand All @@ -80,10 +85,14 @@ func (t RemovalTracker) WaitForRemoval() error {
func (w *LeaseRemovalTracker) LeaseRemovedNotification(lease *LeaseState, err error) {
w.mu.Lock()
defer w.mu.Unlock()
if tracker, ok := w.tracking[lease]; ok {
id := tableVersionID{
id: lease.ID,
version: lease.Version,
}
if tracker, ok := w.tracking[id]; ok {
*tracker.err = err
close(tracker.removed)
delete(w.tracking, lease)
delete(w.tracking, id)
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (ie InternalExecutor) ExecuteStatementInTransaction(
) (int, error) {
p := makeInternalPlanner(opName, txn, security.RootUser, ie.LeaseManager.memMetrics)
defer finishInternalPlanner(p)
p.session.leases.leaseMgr = ie.LeaseManager
p.session.tables.leaseMgr = ie.LeaseManager
return p.exec(ctx, statement, qargs...)
}

Expand All @@ -57,7 +57,7 @@ func (ie InternalExecutor) QueryRowInTransaction(
) (parser.Datums, error) {
p := makeInternalPlanner(opName, txn, security.RootUser, ie.LeaseManager.memMetrics)
defer finishInternalPlanner(p)
p.session.leases.leaseMgr = ie.LeaseManager
p.session.tables.leaseMgr = ie.LeaseManager
return p.QueryRow(ctx, statement, qargs...)
}

Expand All @@ -68,7 +68,7 @@ func (ie InternalExecutor) GetTableSpan(
// Lookup the table ID.
p := makeInternalPlanner("get-table-span", txn, user, ie.LeaseManager.memMetrics)
defer finishInternalPlanner(p)
p.session.leases.leaseMgr = ie.LeaseManager
p.session.tables.leaseMgr = ie.LeaseManager

tn := parser.TableName{DatabaseName: parser.Name(dbName), TableName: parser.Name(tableName)}
tableID, err := getTableID(ctx, p, &tn)
Expand Down Expand Up @@ -97,7 +97,7 @@ func getTableID(ctx context.Context, p *planner, tn *parser.TableName) (sqlbase.
return virtual.GetID(), nil
}

dbID, err := p.session.leases.databaseCache.getDatabaseID(ctx, p.txn, p.getVirtualTabler(), tn.Database())
dbID, err := p.session.tables.databaseCache.getDatabaseID(ctx, p.txn, p.getVirtualTabler(), tn.Database())
if err != nil {
return 0, err
}
Expand Down
Loading

0 comments on commit a95331a

Please sign in to comment.