From ea92406dddac473cf2a29aa9498af8765d3641fb Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Thu, 22 Apr 2021 14:34:27 -0400 Subject: [PATCH 1/2] sql: implicit transactions did not properly release leases Previously, when an implicit transaction was used during the prepare phase a new transaction is constructed, which will acquire leases as needed. The descCollections will be shared between the execution and prepare phases, which could lead to these leases being held during the execute phase even though the prepare transaction is complete. To address this the current patch will release those leases right after the prepare phase. Release note: None --- pkg/sql/conn_executor_prepare.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go index 40fe72f35069..0e9fa7d2a5a6 100644 --- a/pkg/sql/conn_executor_prepare.go +++ b/pkg/sql/conn_executor_prepare.go @@ -227,6 +227,11 @@ func (ex *connExecutor) prepare( if err := ex.server.cfg.DB.Txn(ctx, prepare); err != nil { return nil, err } + // Prepare is an implicit transaction and we know it with any + // other descriptors. Once the implicit transaction is done + // we can safely drop the leases, since the collections object + // will be shared. + ex.extraTxnState.descCollection.ReleaseAll(ctx) } // Account for the memory used by this prepared statement. From df39050b6cad3ad6c414b0ae5520fdae6b85916e Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Wed, 14 Apr 2021 11:24:22 -0400 Subject: [PATCH 2/2] kv,sql: transaction commit deadlines due to leases should be late-bound Previously, SQL statements, during their execution, lease tables from the lease.Manager indirectly through the connExecutor's descs.Collection. Whenever a lease is used, the deadline of that version of that lease is applied to the transaction as a commit deadline. This had the unfortunate side effect that renewals would not be properly picked up, so if a transaction took longer then the 4 to 5 minute validity period, then we can get into problematic scenarios. To address, this is patch, adds lease tracking through the the table collections, so that we are aware when extension occurs, and uses that to figure out the latest time a given transaction can commit. Fixes #51042 Release note (bug fix): Fixed a bug that prevent transactions which lasted longer than 5 minutes and then performed writes from committing. --- pkg/base/config.go | 25 - pkg/base/test_server_args.go | 3 - .../testdata/benchmark_expectations | 90 ++-- pkg/ccl/changefeedccl/rowfetcher_cache.go | 8 +- .../kvclient/kvcoord/txn_coord_sender_test.go | 27 +- pkg/kv/txn.go | 34 +- pkg/kv/txn_test.go | 19 +- pkg/server/config.go | 4 - pkg/server/server_sql.go | 1 - pkg/server/testserver.go | 5 - pkg/settings/setting.go | 2 +- pkg/sql/catalog/descs/collection.go | 91 ++-- pkg/sql/catalog/lease/BUILD.bazel | 1 + pkg/sql/catalog/lease/helpers_test.go | 10 +- pkg/sql/catalog/lease/lease.go | 494 ++++++++++-------- pkg/sql/catalog/lease/lease_internal_test.go | 126 ++--- pkg/sql/catalog/lease/lease_test.go | 405 ++++++++++---- pkg/sql/conn_executor.go | 19 +- pkg/sql/conn_executor_exec.go | 6 + pkg/sql/conn_executor_prepare.go | 9 +- pkg/sql/schema_changer_test.go | 8 +- 21 files changed, 789 insertions(+), 598 deletions(-) diff --git a/pkg/base/config.go b/pkg/base/config.go index 9970f26f9e7d..ffc1674c6ff3 100644 --- a/pkg/base/config.go +++ b/pkg/base/config.go @@ -596,28 +596,3 @@ func TempStorageConfigFromEnv( Settings: st, } } - -// LeaseManagerConfig holds lease manager parameters. -type LeaseManagerConfig struct { - // DescriptorLeaseDuration is the mean duration a lease will be - // acquired for. - DescriptorLeaseDuration time.Duration - - // DescriptorLeaseJitterFraction is the factor that we use to - // randomly jitter the lease duration when acquiring a new lease and - // the lease renewal timeout. - DescriptorLeaseJitterFraction float64 - - // DefaultDescriptorLeaseRenewalTimeout is the default time - // before a lease expires when acquisition to renew the lease begins. - DescriptorLeaseRenewalTimeout time.Duration -} - -// NewLeaseManagerConfig initializes a LeaseManagerConfig with default values. -func NewLeaseManagerConfig() *LeaseManagerConfig { - return &LeaseManagerConfig{ - DescriptorLeaseDuration: DefaultDescriptorLeaseDuration, - DescriptorLeaseJitterFraction: DefaultDescriptorLeaseJitterFraction, - DescriptorLeaseRenewalTimeout: DefaultDescriptorLeaseRenewalTimeout, - } -} diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index f2c3d64fc828..ca8cd5d674be 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -35,9 +35,6 @@ type TestServerArgs struct { *cluster.Settings RaftConfig - // LeaseManagerConfig holds configuration values specific to the LeaseManager. - LeaseManagerConfig *LeaseManagerConfig - // PartOfCluster must be set if the TestServer is joining others in a cluster. // If not set (and hence the server is the only one in the cluster), the // default zone config will be overridden to disable all replication - so that diff --git a/pkg/bench/ddl_analysis/testdata/benchmark_expectations b/pkg/bench/ddl_analysis/testdata/benchmark_expectations index 292e800e9ea0..841a9a61f33a 100644 --- a/pkg/bench/ddl_analysis/testdata/benchmark_expectations +++ b/pkg/bench/ddl_analysis/testdata/benchmark_expectations @@ -2,25 +2,25 @@ exp,benchmark 5,AlterRole/alter_role_with_1_option 6,AlterRole/alter_role_with_2_options 8,AlterRole/alter_role_with_3_options -17,AlterTableAddCheckConstraint/alter_table_add_1_check_constraint -17,AlterTableAddCheckConstraint/alter_table_add_2_check_constraints -17,AlterTableAddCheckConstraint/alter_table_add_3_check_constraints -17,AlterTableAddColumn/alter_table_add_1_column -17,AlterTableAddColumn/alter_table_add_2_columns -17,AlterTableAddColumn/alter_table_add_3_columns -25,AlterTableAddForeignKey/alter_table_add_1_foreign_key -33,AlterTableAddForeignKey/alter_table_add_2_foreign_keys -41,AlterTableAddForeignKey/alter_table_add_3_foreign_keys -25,AlterTableAddForeignKey/alter_table_add_foreign_key_with_3_columns +19,AlterTableAddCheckConstraint/alter_table_add_1_check_constraint +19,AlterTableAddCheckConstraint/alter_table_add_2_check_constraints +19,AlterTableAddCheckConstraint/alter_table_add_3_check_constraints +19,AlterTableAddColumn/alter_table_add_1_column +19,AlterTableAddColumn/alter_table_add_2_columns +19,AlterTableAddColumn/alter_table_add_3_columns +27,AlterTableAddForeignKey/alter_table_add_1_foreign_key +35,AlterTableAddForeignKey/alter_table_add_2_foreign_keys +43,AlterTableAddForeignKey/alter_table_add_3_foreign_keys +27,AlterTableAddForeignKey/alter_table_add_foreign_key_with_3_columns 27,AlterTableConfigureZone/alter_table_configure_zone_5_replicas 27,AlterTableConfigureZone/alter_table_configure_zone_7_replicas_ 27,AlterTableConfigureZone/alter_table_configure_zone_ranges -21,AlterTableDropColumn/alter_table_drop_1_column -25,AlterTableDropColumn/alter_table_drop_2_columns -29,AlterTableDropColumn/alter_table_drop_3_columns -18,AlterTableDropConstraint/alter_table_drop_1_check_constraint -19,AlterTableDropConstraint/alter_table_drop_2_check_constraints -20,AlterTableDropConstraint/alter_table_drop_3_check_constraints +23,AlterTableDropColumn/alter_table_drop_1_column +27,AlterTableDropColumn/alter_table_drop_2_columns +31,AlterTableDropColumn/alter_table_drop_3_columns +20,AlterTableDropConstraint/alter_table_drop_1_check_constraint +21,AlterTableDropConstraint/alter_table_drop_2_check_constraints +22,AlterTableDropConstraint/alter_table_drop_3_check_constraints 14-15,AlterTableSplit/alter_table_split_at_1_value 20-21,AlterTableSplit/alter_table_split_at_2_values 26-27,AlterTableSplit/alter_table_split_at_3_values @@ -31,27 +31,27 @@ exp,benchmark 7,CreateRole/create_role_with_2_options 8,CreateRole/create_role_with_3_options 6,CreateRole/create_role_with_no_options -19,DropDatabase/drop_database_0_tables -28,DropDatabase/drop_database_1_table -37,DropDatabase/drop_database_2_tables -46,DropDatabase/drop_database_3_tables +21,DropDatabase/drop_database_0_tables +30,DropDatabase/drop_database_1_table +39,DropDatabase/drop_database_2_tables +48,DropDatabase/drop_database_3_tables 13,DropRole/drop_1_role 21,DropRole/drop_2_roles 29,DropRole/drop_3_roles -19,DropSequence/drop_1_sequence -29,DropSequence/drop_2_sequences -39,DropSequence/drop_3_sequences -22,DropTable/drop_1_table -35,DropTable/drop_2_tables -48,DropTable/drop_3_tables -23,DropView/drop_1_view -40,DropView/drop_2_views -57,DropView/drop_3_views -19,Grant/grant_all_on_1_table -22,Grant/grant_all_on_2_tables -25,Grant/grant_all_on_3_tables -19,GrantRole/grant_1_role -22,GrantRole/grant_2_roles +21,DropSequence/drop_1_sequence +31,DropSequence/drop_2_sequences +41,DropSequence/drop_3_sequences +24,DropTable/drop_1_table +37,DropTable/drop_2_tables +50,DropTable/drop_3_tables +25,DropView/drop_1_view +42,DropView/drop_2_views +59,DropView/drop_3_views +21,Grant/grant_all_on_1_table +24,Grant/grant_all_on_2_tables +27,Grant/grant_all_on_3_tables +21,GrantRole/grant_1_role +24,GrantRole/grant_2_roles 2,ORMQueries/activerecord_type_introspection_query 2,ORMQueries/django_table_introspection_1_table 2,ORMQueries/django_table_introspection_4_tables @@ -60,19 +60,19 @@ exp,benchmark 2,ORMQueries/pg_class 2,ORMQueries/pg_namespace 2,ORMQueries/pg_type -19,Revoke/revoke_all_on_1_table -22,Revoke/revoke_all_on_2_tables -25,Revoke/revoke_all_on_3_tables -18,RevokeRole/revoke_1_role -20,RevokeRole/revoke_2_roles +21,Revoke/revoke_all_on_1_table +24,Revoke/revoke_all_on_2_tables +27,Revoke/revoke_all_on_3_tables +20,RevokeRole/revoke_1_role +22,RevokeRole/revoke_2_roles 1,SystemDatabaseQueries/select_system.users_with_empty_database_name 1,SystemDatabaseQueries/select_system.users_with_schema_name 2,SystemDatabaseQueries/select_system.users_without_schema_name -24,Truncate/truncate_1_column_0_rows -24,Truncate/truncate_1_column_1_row -24,Truncate/truncate_1_column_2_rows -24,Truncate/truncate_2_column_0_rows -24,Truncate/truncate_2_column_1_rows -24,Truncate/truncate_2_column_2_rows +26,Truncate/truncate_1_column_0_rows +26,Truncate/truncate_1_column_1_row +26,Truncate/truncate_1_column_2_rows +26,Truncate/truncate_2_column_0_rows +26,Truncate/truncate_2_column_1_rows +26,Truncate/truncate_2_column_2_rows 1,VirtualTableQueries/select_crdb_internal.invalid_objects_with_1_fk 1,VirtualTableQueries/select_crdb_internal.tables_with_1_fk diff --git a/pkg/ccl/changefeedccl/rowfetcher_cache.go b/pkg/ccl/changefeedccl/rowfetcher_cache.go index 987fe80ebc66..0f6a7f5ee2a5 100644 --- a/pkg/ccl/changefeedccl/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/rowfetcher_cache.go @@ -82,18 +82,16 @@ func (c *rowFetcherCache) TableDescForKey( // Retrieve the target TableDescriptor from the lease manager. No caching // is attempted because the lease manager does its own caching. - desc, _, err := c.leaseMgr.Acquire(ctx, ts, tableID) + desc, err := c.leaseMgr.Acquire(ctx, ts, tableID) if err != nil { // Manager can return all kinds of errors during chaos, but based on // its usage, none of them should ever be terminal. return nil, MarkRetryableError(err) } + tableDesc = desc.Desc().(catalog.TableDescriptor) // Immediately release the lease, since we only need it for the exact // timestamp requested. - if err := c.leaseMgr.Release(desc); err != nil { - return nil, err - } - tableDesc = desc.(catalog.TableDescriptor) + desc.Release(ctx) if tableDesc.ContainsUserDefinedTypes() { // If the table contains user defined types, then use the descs.Collection // to retrieve a TableDescriptor with type metadata hydrated. We open a diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 2f7f46a1adb8..3dd194b67cad 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -338,22 +338,17 @@ func TestTxnCoordSenderEndTxn(t *testing.T) { case 1: // Past deadline. - if !txn.UpdateDeadlineMaybe(ctx, pushedTimestamp.Prev()) { - t.Fatalf("did not update deadline") - } - + err := txn.UpdateDeadline(ctx, pushedTimestamp.Prev()) + require.NoError(t, err, "Deadline update to past failed") case 2: // Equal deadline. - if !txn.UpdateDeadlineMaybe(ctx, pushedTimestamp) { - t.Fatalf("did not update deadline") - } + err := txn.UpdateDeadline(ctx, pushedTimestamp) + require.NoError(t, err, "Deadline update to equal failed") case 3: // Future deadline. - - if !txn.UpdateDeadlineMaybe(ctx, pushedTimestamp.Next()) { - t.Fatalf("did not update deadline") - } + err := txn.UpdateDeadline(ctx, pushedTimestamp.Next()) + require.NoError(t, err, "Deadline update to future failed") } err = txn.CommitOrCleanup(ctx) @@ -2163,11 +2158,12 @@ func TestReadOnlyTxnObeysDeadline(t *testing.T) { t.Run("standalone commit", func(t *testing.T) { txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */) // Set a deadline. We'll generate a retriable error with a higher timestamp. - txn.UpdateDeadlineMaybe(ctx, clock.Now()) + err := txn.UpdateDeadline(ctx, clock.Now()) + require.NoError(t, err, "Deadline update to now failed") if _, err := txn.Get(ctx, "k"); err != nil { t.Fatal(err) } - err := txn.Commit(ctx) + err = txn.Commit(ctx) assertTransactionRetryError(t, err) if !testutils.IsError(err, "RETRY_COMMIT_DEADLINE_EXCEEDED") { t.Fatalf("expected deadline exceeded, got: %s", err) @@ -2177,10 +2173,11 @@ func TestReadOnlyTxnObeysDeadline(t *testing.T) { t.Run("commit in batch", func(t *testing.T) { txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */) // Set a deadline. We'll generate a retriable error with a higher timestamp. - txn.UpdateDeadlineMaybe(ctx, clock.Now()) + err := txn.UpdateDeadline(ctx, clock.Now()) + require.NoError(t, err, "Deadline update to now failed") b := txn.NewBatch() b.Get("k") - err := txn.CommitInBatch(ctx, b) + err = txn.CommitInBatch(ctx, b) assertTransactionRetryError(t, err) if !testutils.IsError(err, "RETRY_COMMIT_DEADLINE_EXCEEDED") { t.Fatalf("expected deadline exceeded, got: %s", err) diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 29663d06cb9c..abf723af8446 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -684,29 +684,29 @@ func (txn *Txn) CommitOrCleanup(ctx context.Context) error { return err } -// UpdateDeadlineMaybe sets the transactions deadline to the lower of the -// current one (if any) and the passed value. -// -// The deadline cannot be lower than txn.ReadTimestamp. -func (txn *Txn) UpdateDeadlineMaybe(ctx context.Context, deadline hlc.Timestamp) bool { +// UpdateDeadline sets the transactions deadline to the passed deadline. +// It may move the deadline to any timestamp above the current read timestamp. +// If the deadline is below the current provisional commit timestamp (write timestamp), +// then the transaction will fail with a deadline error during the commit. +// The deadline cannot be lower than txn.ReadTimestamp and we make the assumption +// the read timestamp will not change during execution, which is valid today. +func (txn *Txn) UpdateDeadline(ctx context.Context, deadline hlc.Timestamp) error { if txn.typ != RootTxn { - panic(errors.WithContextTags(errors.AssertionFailedf("UpdateDeadlineMaybe() called on leaf txn"), ctx)) + panic(errors.WithContextTags(errors.AssertionFailedf("UpdateDeadline() called on leaf txn"), ctx)) } txn.mu.Lock() defer txn.mu.Unlock() - if txn.mu.deadline == nil || deadline.Less(*txn.mu.deadline) { - readTimestamp := txn.readTimestampLocked() - if deadline.Less(txn.readTimestampLocked()) { - log.Fatalf(ctx, "deadline below read timestamp is nonsensical; "+ - "txn has would have no change to commit. Deadline: %s. Read timestamp: %s.", - deadline, readTimestamp) - } - txn.mu.deadline = new(hlc.Timestamp) - *txn.mu.deadline = deadline - return true + + readTimestamp := txn.readTimestampLocked() + if deadline.Less(readTimestamp) { + log.Fatalf(ctx, "deadline below read timestamp is nonsensical; "+ + "txn has would have no chance to commit. Deadline: %s. Read timestamp: %s Previous Deadline: %s.", + deadline, readTimestamp, txn.mu.deadline) } - return false + txn.mu.deadline = new(hlc.Timestamp) + *txn.mu.deadline = deadline + return nil } // resetDeadlineLocked resets the deadline. diff --git a/pkg/kv/txn_test.go b/pkg/kv/txn_test.go index b68148e8fb49..1d4968998d9f 100644 --- a/pkg/kv/txn_test.go +++ b/pkg/kv/txn_test.go @@ -497,25 +497,24 @@ func TestUpdateDeadlineMaybe(t *testing.T) { } deadline := hlc.Timestamp{WallTime: 10, Logical: 1} - if !txn.UpdateDeadlineMaybe(ctx, deadline) { - t.Errorf("expected update, but it didn't happen") - } + err := txn.UpdateDeadline(ctx, deadline) + require.NoError(t, err, "Deadline update failed") if d := *txn.deadline(); d != deadline { t.Errorf("unexpected deadline: %s", d) } + // Deadline is always updated now, there is no + // maybe. futureDeadline := hlc.Timestamp{WallTime: 11, Logical: 1} - if txn.UpdateDeadlineMaybe(ctx, futureDeadline) { - t.Errorf("expected no update, but update happened") - } - if d := *txn.deadline(); d != deadline { + err = txn.UpdateDeadline(ctx, futureDeadline) + require.NoError(t, err, "Future deadline update failed") + if d := *txn.deadline(); d != futureDeadline { t.Errorf("unexpected deadline: %s", d) } pastDeadline := hlc.Timestamp{WallTime: 9, Logical: 1} - if !txn.UpdateDeadlineMaybe(ctx, pastDeadline) { - t.Errorf("expected update, but it didn't happen") - } + err = txn.UpdateDeadline(ctx, pastDeadline) + require.NoError(t, err, "Past deadline update failed") if d := *txn.deadline(); d != pastDeadline { t.Errorf("unexpected deadline: %s", d) } diff --git a/pkg/server/config.go b/pkg/server/config.go index 9d17b2b03a93..81f56fca8afc 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -307,9 +307,6 @@ type SQLConfig struct { // The tenant that the SQL server runs on the behalf of. TenantID roachpb.TenantID - // LeaseManagerConfig holds configuration values specific to the LeaseManager. - LeaseManagerConfig *base.LeaseManagerConfig - // SocketFile, if non-empty, sets up a TLS-free local listener using // a unix datagram socket at the specified path. SocketFile string @@ -347,7 +344,6 @@ func MakeSQLConfig(tenID roachpb.TenantID, tempStorageCfg base.TempStorageConfig TableStatCacheSize: defaultSQLTableStatCacheSize, QueryCacheSize: defaultSQLQueryCacheSize, TempStorageConfig: tempStorageCfg, - LeaseManagerConfig: base.NewLeaseManagerConfig(), } return sqlCfg } diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 69b7635ca8ee..440b195966e9 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -313,7 +313,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { lmKnobs, cfg.stopper, cfg.rangeFeedFactory, - cfg.LeaseManagerConfig, ) cfg.registry.AddMetricStruct(leaseMgr.MetricsStruct()) diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index cdcef72f58d3..bfdda7ec8f32 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -143,11 +143,6 @@ func makeTestConfigFromParams(params base.TestServerArgs) Config { cfg.TestingKnobs = params.Knobs cfg.RaftConfig = params.RaftConfig cfg.RaftConfig.SetDefaults() - if params.LeaseManagerConfig != nil { - cfg.LeaseManagerConfig = params.LeaseManagerConfig - } else { - cfg.LeaseManagerConfig = base.NewLeaseManagerConfig() - } if params.JoinAddr != "" { cfg.JoinList = []string{params.JoinAddr} } diff --git a/pkg/settings/setting.go b/pkg/settings/setting.go index dcb3ce0a3fce..1aadc85df6a7 100644 --- a/pkg/settings/setting.go +++ b/pkg/settings/setting.go @@ -20,7 +20,7 @@ import ( // MaxSettings is the maximum number of settings that the system supports. // Exported for tests. -const MaxSettings = 288 +const MaxSettings = 512 // Values is a container that stores values for all registered settings. // Each setting is assigned a unique slot (up to MaxSettings). diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index e668ba91be18..0fd589a36265 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/errors" @@ -58,21 +59,27 @@ type uncommittedDescriptor struct { // leasedDescriptors holds references to all the descriptors leased in the // transaction, and supports access by name and by ID. type leasedDescriptors struct { - descs []catalog.Descriptor + descs []lease.LeasedDescriptor } -func (ld *leasedDescriptors) add(desc catalog.Descriptor) { +func (ld *leasedDescriptors) add(desc lease.LeasedDescriptor) { ld.descs = append(ld.descs, desc) } -func (ld *leasedDescriptors) release(ids []descpb.ID) (toRelease []catalog.Descriptor) { +func (ld *leasedDescriptors) releaseAll() (toRelease []lease.LeasedDescriptor) { + toRelease = append(toRelease, ld.descs...) + ld.descs = ld.descs[:0] + return toRelease +} + +func (ld *leasedDescriptors) release(ids []descpb.ID) (toRelease []lease.LeasedDescriptor) { // Sort the descriptors and leases to make it easy to find the leases to release. leasedDescs := ld.descs sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) sort.Slice(leasedDescs, func(i, j int) bool { - return leasedDescs[i].GetID() < leasedDescs[j].GetID() + return leasedDescs[i].Desc().GetID() < leasedDescs[j].Desc().GetID() }) filteredLeases := leasedDescs[:0] // will store the remaining leases @@ -84,7 +91,7 @@ func (ld *leasedDescriptors) release(ids []descpb.ID) (toRelease []catalog.Descr return len(idsToConsider) > 0 && idsToConsider[0] == id } for _, l := range leasedDescs { - if !shouldRelease(l.GetID()) { + if !shouldRelease(l.Desc().GetID()) { filteredLeases = append(filteredLeases, l) } else { toRelease = append(toRelease, l) @@ -97,8 +104,8 @@ func (ld *leasedDescriptors) release(ids []descpb.ID) (toRelease []catalog.Descr func (ld *leasedDescriptors) getByID(id descpb.ID) catalog.Descriptor { for i := range ld.descs { desc := ld.descs[i] - if desc.GetID() == id { - return desc + if desc.Desc().GetID() == id { + return desc.Desc() } } return nil @@ -109,8 +116,8 @@ func (ld *leasedDescriptors) getByName( ) catalog.Descriptor { for i := range ld.descs { desc := ld.descs[i] - if lease.NameMatchesDescriptor(desc, dbID, schemaID, name) { - return desc + if lease.NameMatchesDescriptor(desc.Desc(), dbID, schemaID, name) { + return desc.Desc() } } return nil @@ -276,7 +283,7 @@ func (tc *Collection) getLeasedDescriptorByName( } readTimestamp := txn.ReadTimestamp() - desc, expiration, err := tc.leaseMgr.AcquireByName(ctx, readTimestamp, parentID, parentSchemaID, name) + ldesc, err := tc.leaseMgr.AcquireByName(ctx, readTimestamp, parentID, parentSchemaID, name) if err != nil { // Read the descriptor from the store in the face of some specific errors // because of a known limitation of AcquireByName. See the known @@ -290,21 +297,48 @@ func (tc *Collection) getLeasedDescriptorByName( return nil, false, err } + expiration := ldesc.Expiration() if expiration.LessEq(readTimestamp) { log.Fatalf(ctx, "bad descriptor for T=%s, expiration=%s", readTimestamp, expiration) } - tc.leasedDescriptors.add(desc) + tc.leasedDescriptors.add(ldesc) if log.V(2) { - log.Eventf(ctx, "added descriptor '%s' to collection: %+v", name, desc) + log.Eventf(ctx, "added descriptor '%s' to collection: %+v", name, ldesc.Desc()) } // If the descriptor we just acquired expires before the txn's deadline, // reduce the deadline. We use ReadTimestamp() that doesn't return the commit // timestamp, so we need to set a deadline on the transaction to prevent it // from committing beyond the version's expiration time. - txn.UpdateDeadlineMaybe(ctx, expiration) - return desc, false, nil + err = tc.MaybeUpdateDeadline(ctx, txn) + if err != nil { + return nil, false, err + } + return ldesc.Desc(), false, nil +} + +// Deadline returns the latest expiration from our leased +// descriptors which should b e the transactions deadline. +func (tc *Collection) Deadline() (deadline hlc.Timestamp, haveDeadline bool) { + for _, l := range tc.leasedDescriptors.descs { + expiration := l.Expiration() + if !haveDeadline || expiration.Less(deadline) { + haveDeadline = true + deadline = expiration + } + } + return deadline, haveDeadline +} + +// MaybeUpdateDeadline updates the deadline in a given transaction +// based on the leased descriptors in this collection. This update is +// only done when a deadline exists. +func (tc *Collection) MaybeUpdateDeadline(ctx context.Context, txn *kv.Txn) (err error) { + if deadline, haveDeadline := tc.Deadline(); haveDeadline { + err = txn.UpdateDeadline(ctx, deadline) + } + return err } // getLeasedDescriptorByID return a leased descriptor valid for the transaction, @@ -321,26 +355,25 @@ func (tc *Collection) getLeasedDescriptorByID( } readTimestamp := txn.ReadTimestamp() - desc, expiration, err := tc.leaseMgr.Acquire(ctx, readTimestamp, id) + desc, err := tc.leaseMgr.Acquire(ctx, readTimestamp, id) if err != nil { return nil, err } - + expiration := desc.Expiration() if expiration.LessEq(readTimestamp) { log.Fatalf(ctx, "bad descriptor for T=%s, expiration=%s", readTimestamp, expiration) } tc.leasedDescriptors.add(desc) - log.VEventf(ctx, 2, "added descriptor %q to collection", desc.GetName()) + log.VEventf(ctx, 2, "added descriptor %q to collection", desc.Desc().GetName()) if setTxnDeadline { - // If the descriptor we just acquired expires before the txn's deadline, - // reduce the deadline. We use ReadTimestamp() that doesn't return the commit - // timestamp, so we need to set a deadline on the transaction to prevent it - // from committing beyond the version's expiration time. - txn.UpdateDeadlineMaybe(ctx, expiration) + err := tc.MaybeUpdateDeadline(ctx, txn) + if err != nil { + return nil, err + } } - return desc, nil + return desc.Desc(), nil } // getDescriptorFromStore gets a descriptor from its namespace entry. It does @@ -632,6 +665,7 @@ func (tc *Collection) getTableByName( if err != nil { return false, nil, err } + return true, hydrated, nil } @@ -1322,21 +1356,16 @@ func (tc *Collection) ReleaseSpecifiedLeases(ctx context.Context, descs []lease. } toRelease := tc.leasedDescriptors.release(ids) for _, desc := range toRelease { - if err := tc.leaseMgr.Release(desc); err != nil { - log.Warningf(ctx, "%v", err) - } + desc.Release(ctx) } } // ReleaseLeases releases all leases. Errors are logged but ignored. func (tc *Collection) ReleaseLeases(ctx context.Context) { log.VEventf(ctx, 2, "releasing %d descriptors", tc.leasedDescriptors.numDescriptors()) - for _, desc := range tc.leasedDescriptors.descs { - if err := tc.leaseMgr.Release(desc); err != nil { - log.Warningf(ctx, "%v", err) - } + for _, desc := range tc.leasedDescriptors.releaseAll() { + desc.Release(ctx) } - tc.leasedDescriptors.descs = tc.leasedDescriptors.descs[:0] } // ReleaseAll releases all state currently held by the Collection. diff --git a/pkg/sql/catalog/lease/BUILD.bazel b/pkg/sql/catalog/lease/BUILD.bazel index df90906f73fb..c3391daf8a70 100644 --- a/pkg/sql/catalog/lease/BUILD.bazel +++ b/pkg/sql/catalog/lease/BUILD.bazel @@ -60,6 +60,7 @@ go_test( "//pkg/security", "//pkg/security/securitytest", "//pkg/server", + "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/catalogkeys", diff --git a/pkg/sql/catalog/lease/helpers_test.go b/pkg/sql/catalog/lease/helpers_test.go index c728186d4e67..95dc962d5971 100644 --- a/pkg/sql/catalog/lease/helpers_test.go +++ b/pkg/sql/catalog/lease/helpers_test.go @@ -107,12 +107,18 @@ func (w *LeaseRemovalTracker) LeaseRemovedNotification( } } +// ExpireLeases ia a hack for testing that manually sets expirations to a past +// timestamp. func (m *Manager) ExpireLeases(clock *hlc.Clock) { - past := clock.Now().GoTime().Add(-time.Millisecond) + past := hlc.Timestamp{ + WallTime: clock.Now().GoTime().Add(-time.Millisecond).UnixNano(), + } m.names.mu.Lock() for _, desc := range m.names.descriptors { - desc.expiration = hlc.Timestamp{WallTime: past.UnixNano()} + desc.mu.Lock() + desc.mu.expiration = past + desc.mu.Unlock() } m.names.mu.Unlock() } diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 6892138e45c2..d3e906c7134f 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -55,6 +55,33 @@ import ( var errRenewLease = errors.New("renew lease on id") var errReadOlderVersion = errors.New("read older descriptor version from store") +// LeaseDuration controls the duration of sql descriptor leases. +var LeaseDuration = settings.RegisterDurationSetting( + "sql.catalog.descriptor_lease_duration", + "mean duration of sql descriptor leases, this actual duration is jitterred", + base.DefaultDescriptorLeaseDuration) + +func between0and1inclusive(f float64) error { + if f < 0 || f > 1 { + return errors.Errorf("value %f must be between 0 and 1", f) + } + return nil +} + +// LeaseJitterFraction controls the percent jitter around sql lease durations +var LeaseJitterFraction = settings.RegisterFloatSetting( + "sql.catalog.descriptor_lease_jitter_fraction", + "mean duration of sql descriptor leases, this actual duration is jitterred", + base.DefaultDescriptorLeaseJitterFraction, + between0and1inclusive) + +// LeaseRenewalDuration controls the default time before a lease expires when +// acquisition to renew the lease begins. +var LeaseRenewalDuration = settings.RegisterDurationSetting( + "sql.catalog.descriptor_lease_renewal_fraction", + "controls the default time before a lease expires when acquisition to renew the lease begins", + base.DefaultDescriptorLeaseRenewalTimeout) + // A lease stored in system.lease. type storedLease struct { id descpb.ID @@ -62,27 +89,32 @@ type storedLease struct { expiration tree.DTimestamp } +func (s *storedLease) String() string { + return fmt.Sprintf("ID = %d ver=%d expiration=%s", s.id, s.version, s.expiration) +} + // descriptorVersionState holds the state for a descriptor version. This // includes the lease information for a descriptor version. // TODO(vivek): A node only needs to manage lease information on what it // thinks is the latest version for a descriptor. type descriptorVersionState struct { + t *descriptorState // This descriptor is immutable and can be shared by many goroutines. // Care must be taken to not modify it. catalog.Descriptor - // The expiration time for the descriptor version. A transaction with - // timestamp T can use this descriptor version iff - // Descriptor.GetDescriptorModificationTime() <= T < expiration - // - // The expiration time is either the expiration time of the lease when a lease - // is associated with the version, or the ModificationTime of the next version - // when the version isn't associated with a lease. - expiration hlc.Timestamp - mu struct { syncutil.Mutex + // The expiration time for the descriptor version. A transaction with + // timestamp T can use this descriptor version iff + // Descriptor.GetDescriptorModificationTime() <= T < expiration + // + // The expiration time is either the expiration time of the lease when a lease + // is associated with the version, or the ModificationTime of the next version + // when the version isn't associated with a lease. + expiration hlc.Timestamp + refcount int // Set if the node has a lease on this descriptor version. // Leases can only be held for the two latest versions of @@ -94,6 +126,26 @@ type descriptorVersionState struct { } } +func (s *descriptorVersionState) Release(ctx context.Context) { + s.t.release(ctx, s) +} + +func (s *descriptorVersionState) Desc() catalog.Descriptor { + return s.Descriptor +} + +func (s *descriptorVersionState) Expiration() hlc.Timestamp { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.expiration +} + +func (s *descriptorVersionState) SafeMessage() string { + s.mu.Lock() + defer s.mu.Unlock() + return fmt.Sprintf("%d ver=%d:%s, refcount=%d", s.GetID(), s.GetVersion(), s.mu.expiration, s.mu.refcount) +} + func (s *descriptorVersionState) String() string { s.mu.Lock() defer s.mu.Unlock() @@ -102,21 +154,21 @@ func (s *descriptorVersionState) String() string { // stringLocked reads mu.refcount and thus needs to have mu held. func (s *descriptorVersionState) stringLocked() string { - return fmt.Sprintf("%d(%q) ver=%d:%s, refcount=%d", s.GetID(), s.GetName(), s.GetVersion(), s.expiration, s.mu.refcount) + return fmt.Sprintf("%d(%q) ver=%d:%s, refcount=%d", s.GetID(), s.GetName(), s.GetVersion(), s.mu.expiration, s.mu.refcount) } // hasExpired checks if the descriptor is too old to be used (by a txn // operating) at the given timestamp. func (s *descriptorVersionState) hasExpired(timestamp hlc.Timestamp) bool { - return s.expiration.LessEq(timestamp) + s.mu.Lock() + defer s.mu.Unlock() + return s.hasExpiredLocked(timestamp) } -// hasValidExpiration checks that this descriptor has a later expiration than -// the existing one it is replacing. This can be used to check the -// monotonicity of the expiration times on a descriptor at a particular version. -// The version is not explicitly checked here. -func (s *descriptorVersionState) hasValidExpiration(existing *descriptorVersionState) bool { - return existing.expiration.Less(s.expiration) +// hasExpired checks if the descriptor is too old to be used (by a txn +// operating) at the given timestamp. +func (s *descriptorVersionState) hasExpiredLocked(timestamp hlc.Timestamp) bool { + return s.mu.expiration.LessEq(timestamp) } func (s *descriptorVersionState) incRefcount() { @@ -132,6 +184,12 @@ func (s *descriptorVersionState) incRefcountLocked() { } } +func (s *descriptorVersionState) getExpiration() hlc.Timestamp { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.expiration +} + // The lease expiration stored in the database is of a different type. // We've decided that it's too much work to change the type to // hlc.Timestamp, so we're using this method to give us the stored @@ -157,30 +215,17 @@ type storage struct { // concurrent lease acquisitions from the store. group *singleflight.Group - // leaseDuration is the mean duration a lease will be acquired for. The - // actual duration is jittered using leaseJitterFraction. Jittering is done to - // prevent multiple leases from being renewed simultaneously if they were all - // acquired simultaneously. - leaseDuration time.Duration - // leaseJitterFraction is the factor that we use to randomly jitter the lease - // duration when acquiring a new lease and the lease renewal timeout. The - // range of the actual lease duration will be - // [(1-leaseJitterFraction) * leaseDuration, (1+leaseJitterFraction) * leaseDuration] - leaseJitterFraction float64 - // leaseRenewalTimeout is the time before a lease expires when - // acquisition to renew the lease begins. - leaseRenewalTimeout time.Duration - outstandingLeases *metric.Gauge - - testingKnobs StorageTestingKnobs + testingKnobs StorageTestingKnobs } // jitteredLeaseDuration returns a randomly jittered duration from the interval // [(1-leaseJitterFraction) * leaseDuration, (1+leaseJitterFraction) * leaseDuration]. func (s storage) jitteredLeaseDuration() time.Duration { - return time.Duration(float64(s.leaseDuration) * (1 - s.leaseJitterFraction + - 2*s.leaseJitterFraction*rand.Float64())) + leaseDuration := LeaseDuration.Get(&s.settings.SV) + jitterFraction := LeaseJitterFraction.Get(&s.settings.SV) + return time.Duration(float64(leaseDuration) * (1 - jitterFraction + + 2*jitterFraction*rand.Float64())) } // acquire a lease on the most recent version of a descriptor. If the lease @@ -189,9 +234,8 @@ func (s storage) jitteredLeaseDuration() time.Duration { // inactiveTableError. The expiration time set for the lease > minExpiration. func (s storage) acquire( ctx context.Context, minExpiration hlc.Timestamp, id descpb.ID, -) (*descriptorVersionState, error) { - var descVersionState *descriptorVersionState - err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { +) (desc catalog.Descriptor, expiration hlc.Timestamp, _ error) { + err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { // Run the descriptor read as high-priority, thereby pushing any intents out // of its way. We don't want schema changes to prevent lease acquisitions; // we'd rather force them to refresh. Also this prevents deadlocks in cases @@ -200,7 +244,7 @@ func (s storage) acquire( if err := txn.SetUserPriority(roachpb.MaxUserPriority); err != nil { return err } - expiration := txn.ReadTimestamp().Add(int64(s.jitteredLeaseDuration()), 0) + expiration = txn.ReadTimestamp().Add(int64(s.jitteredLeaseDuration()), 0) if expiration.LessEq(minExpiration) { // In the rare circumstances where expiration <= minExpiration // use an expiration based on the minExpiration to guarantee @@ -212,7 +256,7 @@ func (s storage) acquire( // to ValidateSelf() instead of Validate(), to avoid the cross-table // checks. Does this actually matter? We already potentially do cross-table // checks when populating pre-19.2 foreign keys. - desc, err := catalogkv.MustGetDescriptorByID(ctx, txn, s.codec, id) + desc, err = catalogkv.MustGetDescriptorByID(ctx, txn, s.codec, id) if err != nil { return err } @@ -221,20 +265,7 @@ func (s storage) acquire( ); err != nil { return err } - // Once the descriptor is set it is immutable and care must be taken - // to not modify it. - storedLease := &storedLease{ - id: desc.GetID(), - version: int(desc.GetVersion()), - expiration: storedLeaseExpiration(expiration), - } - descVersionState = &descriptorVersionState{ - Descriptor: desc, - expiration: expiration, - } - log.VEventf(ctx, 2, "storage acquired lease %+v", storedLease) - descVersionState.mu.lease = storedLease - + log.VEventf(ctx, 2, "storage acquired lease %v@%v", desc, expiration) nodeID := s.nodeIDContainer.SQLInstanceID() if nodeID == 0 { panic("zero nodeID") @@ -246,9 +277,10 @@ func (s storage) acquire( // general cost of preparing, preparing this statement always requires a // read from the database for the special descriptor of a system table // (#23937). + ts := storedLeaseExpiration(expiration) insertLease := fmt.Sprintf( `INSERT INTO system.public.lease ("descID", version, "nodeID", expiration) VALUES (%d, %d, %d, %s)`, - storedLease.id, storedLease.version, nodeID, &storedLease.expiration, + desc.GetID(), desc.GetVersion(), nodeID, &ts, ) count, err := s.internalExecutor.Exec(ctx, "lease-insert", txn, insertLease) if err != nil { @@ -260,10 +292,13 @@ func (s storage) acquire( s.outstandingLeases.Inc(1) return nil }) - if err == nil && s.testingKnobs.LeaseAcquiredEvent != nil { - s.testingKnobs.LeaseAcquiredEvent(descVersionState.Descriptor, nil) + if err != nil { + return nil, hlc.Timestamp{}, err + } + if s.testingKnobs.LeaseAcquiredEvent != nil { + s.testingKnobs.LeaseAcquiredEvent(desc, err) } - return descVersionState, err + return desc, expiration, nil } // Release a previously acquired descriptor. Never let this method @@ -409,12 +444,12 @@ func CountLeases( // layer GC threshold. func (s storage) getForExpiration( ctx context.Context, expiration hlc.Timestamp, id descpb.ID, -) (*descriptorVersionState, error) { - var descVersionState *descriptorVersionState - err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { +) (catalog.Descriptor, error) { + var desc catalog.Descriptor + err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { prevTimestamp := expiration.Prev() txn.SetFixedTimestamp(ctx, prevTimestamp) - desc, err := catalogkv.MustGetDescriptorByID(ctx, txn, s.codec, id) + desc, err = catalogkv.MustGetDescriptorByID(ctx, txn, s.codec, id) if err != nil { return err } @@ -424,19 +459,14 @@ func (s storage) getForExpiration( id, expiration, desc.GetModificationTime()) } // Create a descriptorVersionState with the descriptor and without a lease. - descVersionState = &descriptorVersionState{ - Descriptor: desc, - expiration: expiration, - } return nil }) - return descVersionState, err + return desc, err } -// leaseToken is an opaque token representing a lease. It's distinct from a -// lease to define restricted capabilities and prevent improper use of a lease -// where we instead have leaseTokens. -type leaseToken *descriptorVersionState +func (s storage) leaseRenewalTimeout() time.Duration { + return LeaseRenewalDuration.Get(&s.settings.SV) +} // descriptorSet maintains an ordered set of descriptorVersionState objects // sorted by version. It supports addition and removal of elements, finding the @@ -458,7 +488,7 @@ func (l *descriptorSet) String() string { if i > 0 { buf.WriteString(" ") } - buf.WriteString(fmt.Sprintf("%d:%d", s.GetVersion(), s.expiration.WallTime)) + buf.WriteString(fmt.Sprintf("%d:%d", s.GetVersion(), s.getExpiration().WallTime)) } return buf.String() } @@ -534,6 +564,15 @@ func (l *descriptorSet) findVersion(version descpb.DescriptorVersion) *descripto } type descriptorState struct { + m *Manager + id descpb.ID + stopper *stop.Stopper + + // renewalInProgress is an atomic indicator for when a renewal for a + // lease has begun. This is atomic to prevent multiple routines from + // entering renewal initialization. + renewalInProgress int32 + mu struct { syncutil.Mutex @@ -568,14 +607,6 @@ type descriptorState struct { // ignored. acquisitionsInProgress int } - - stopper *stop.Stopper - id descpb.ID - - // renewalInProgress is an atomic indicator for when a renewal for a - // lease has begun. This is atomic to prevent multiple routines from - // entering renewal initialization. - renewalInProgress int32 } // ensureVersion ensures that the latest version >= minVersion. It will @@ -653,6 +684,11 @@ func (t *descriptorState) findForTimestamp( return nil, false, errReadOlderVersion } +type historicalDescriptor struct { + desc catalog.Descriptor + expiration hlc.Timestamp // ModificationTime of the next descriptor +} + // Read an older descriptor version for the particular timestamp // from the store. We unfortunately need to read more than one descriptor // version just so that we can set the expiration time on the descriptor @@ -666,7 +702,7 @@ func (t *descriptorState) findForTimestamp( // They are currently purged in PurgeOldVersions. func (m *Manager) readOlderVersionForTimestamp( ctx context.Context, id descpb.ID, timestamp hlc.Timestamp, -) ([]*descriptorVersionState, error) { +) ([]historicalDescriptor, error) { expiration, done := func() (hlc.Timestamp, bool) { t := m.findDescriptorState(id, false /* create */) t.mu.Lock() @@ -676,9 +712,9 @@ func (m *Manager) readOlderVersionForTimestamp( for i := len(t.mu.active.data) - 1; i >= 0; i-- { // Check to see if the ModificationTime is valid. if desc := t.mu.active.data[i]; desc.GetModificationTime().LessEq(timestamp) { - if timestamp.Less(desc.expiration) { + if expiration := desc.getExpiration(); timestamp.Less(expiration) { // Existing valid descriptor version. - return desc.expiration, true + return expiration, true } // We need a version after data[i], but before data[i+1]. // We could very well use the timestamp to read the @@ -705,13 +741,16 @@ func (m *Manager) readOlderVersionForTimestamp( } // Read descriptors from the store. - var versions []*descriptorVersionState + var versions []historicalDescriptor for { desc, err := m.storage.getForExpiration(ctx, expiration, id) if err != nil { return nil, err } - versions = append(versions, desc) + versions = append(versions, historicalDescriptor{ + desc: desc, + expiration: expiration, + }) if desc.GetModificationTime().LessEq(timestamp) { break } @@ -724,17 +763,18 @@ func (m *Manager) readOlderVersionForTimestamp( // Insert descriptor versions. The versions provided are not in // any particular order. -func (m *Manager) insertDescriptorVersions(id descpb.ID, versions []*descriptorVersionState) { +func (m *Manager) insertDescriptorVersions(id descpb.ID, versions []historicalDescriptor) { t := m.findDescriptorState(id, false /* create */) t.mu.Lock() defer t.mu.Unlock() - for _, version := range versions { + for i := range versions { // Since we gave up the lock while reading the versions from // the store we have to ensure that no one else inserted the // same version. - existingVersion := t.mu.active.findVersion(version.GetVersion()) + existingVersion := t.mu.active.findVersion(versions[i].desc.GetVersion()) if existingVersion == nil { - t.mu.active.insert(version) + t.mu.active.insert( + newDescriptorVersionState(t, versions[i].desc, versions[i].expiration, false)) } } } @@ -785,12 +825,14 @@ func (m *Manager) AcquireFreshestFromStore(ctx context.Context, id descpb.ID) er return nil } -// upsertLocked inserts a lease for a particular descriptor version. +var _ redact.SafeMessager = (*descriptorVersionState)(nil) + +// upsertLeaseLocked inserts a lease for a particular descriptor version. // If an existing lease exists for the descriptor version it replaces // it and returns it. -func (t *descriptorState) upsertLocked( - ctx context.Context, desc *descriptorVersionState, -) (_ *storedLease, _ error) { +func (t *descriptorState) upsertLeaseLocked( + ctx context.Context, desc catalog.Descriptor, expiration hlc.Timestamp, +) (createdDescriptorVersionState *descriptorVersionState, toRelease *storedLease, _ error) { if t.mu.maxVersionSeen < desc.GetVersion() { t.mu.maxVersionSeen = desc.GetVersion() } @@ -799,37 +841,55 @@ func (t *descriptorState) upsertLocked( if t.mu.active.findNewest() != nil { log.Infof(ctx, "new lease: %s", desc) } - t.mu.active.insert(desc) - return nil, nil + descState := newDescriptorVersionState(t, desc, expiration, true /* isLease */) + t.mu.active.insert(descState) + return descState, nil, nil } + s.mu.Lock() + defer s.mu.Unlock() // The desc is replacing an existing one at the same version. - if !desc.hasValidExpiration(s) { + if !s.mu.expiration.Less(expiration) { // This is a violation of an invariant and can actually not // happen. We return an error here to aid in further investigations. - return nil, errors.Errorf("lease expiration monotonicity violation, (%s) vs (%s)", s, desc) + return nil, nil, errors.AssertionFailedf("lease expiration monotonicity violation, (%s) vs (%s)", s, desc) } - s.mu.Lock() - desc.mu.Lock() // subsume the refcount of the older lease. This is permitted because // the new lease has a greater expiration than the older lease and // any transaction using the older lease can safely use a deadline set // to the older lease's expiration even though the older lease is // released! This is because the new lease is valid at the same desc // version at a greater expiration. - desc.mu.refcount += s.mu.refcount - s.mu.refcount = 0 - l := s.mu.lease - s.mu.lease = nil - if log.V(2) { - log.VEventf(ctx, 2, "replaced lease: %s with %s", s.stringLocked(), desc.stringLocked()) + s.mu.expiration = expiration + toRelease = s.mu.lease + s.mu.lease = &storedLease{ + id: desc.GetID(), + version: int(desc.GetVersion()), + expiration: storedLeaseExpiration(expiration), } - desc.mu.Unlock() - s.mu.Unlock() - t.mu.active.remove(s) - t.mu.active.insert(desc) - return l, nil + if log.ExpensiveLogEnabled(ctx, 2) { + log.VEventf(ctx, 2, "replaced lease: %s with %s", toRelease, s.mu.lease) + } + return nil, toRelease, nil +} + +func newDescriptorVersionState( + t *descriptorState, desc catalog.Descriptor, expiration hlc.Timestamp, isLease bool, +) *descriptorVersionState { + descState := &descriptorVersionState{ + t: t, + Descriptor: desc, + } + descState.mu.expiration = expiration + if isLease { + descState.mu.lease = &storedLease{ + id: desc.GetID(), + version: int(desc.GetVersion()), + expiration: storedLeaseExpiration(expiration), + } + } + return descState } // removeInactiveVersions removes inactive versions in t.mu.active.data with @@ -873,9 +933,9 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro newest := m.findNewest(id) var minExpiration hlc.Timestamp if newest != nil { - minExpiration = newest.expiration + minExpiration = newest.getExpiration() } - desc, err := m.storage.acquire(newCtx, minExpiration, id) + desc, expiration, err := m.storage.acquire(newCtx, minExpiration, id) if err != nil { return nil, err } @@ -883,15 +943,18 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro t.mu.Lock() t.mu.takenOffline = false defer t.mu.Unlock() - toRelease, err = t.upsertLocked(newCtx, desc) + var newDescVersionState *descriptorVersionState + newDescVersionState, toRelease, err = t.upsertLeaseLocked(newCtx, desc, expiration) if err != nil { return nil, err } - m.names.insert(desc) + if newDescVersionState != nil { + m.names.insert(newDescVersionState) + } if toRelease != nil { releaseLease(toRelease, m) } - return leaseToken(desc), nil + return true, nil }) select { case <-ctx.Done(): @@ -906,23 +969,15 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro // release returns a descriptorVersionState that needs to be released from // the store. -func (t *descriptorState) release( - desc catalog.Descriptor, removeOnceDereferenced bool, -) (*storedLease, error) { - t.mu.Lock() - defer t.mu.Unlock() +func (t *descriptorState) release(ctx context.Context, s *descriptorVersionState) { - s := t.mu.active.find(desc.GetVersion()) - if s == nil { - return nil, errors.Errorf("descriptor %d version %d not found", desc.GetID(), desc.GetVersion()) - } // Decrements the refcount and returns true if the lease has to be removed // from the store. decRefcount := func(s *descriptorVersionState) *storedLease { // Figure out if we'd like to remove the lease from the store asap (i.e. // when the refcount drops to 0). If so, we'll need to mark the lease as // invalid. - removeOnceDereferenced = removeOnceDereferenced || + removeOnceDereferenced := t.m.removeOnceDereferenced() || // Release from the store if the descriptor has been dropped or taken // offline. t.mu.takenOffline || @@ -948,11 +1003,18 @@ func (t *descriptorState) release( } return nil } - if l := decRefcount(s); l != nil { - t.mu.active.remove(s) - return l, nil + maybeRemoveLease := func() *storedLease { + t.mu.Lock() + defer t.mu.Unlock() + if l := decRefcount(s); l != nil { + t.mu.active.remove(s) + return l + } + return nil + } + if l := maybeRemoveLease(); l != nil { + releaseLease(l, t.m) } - return nil, nil } // releaseLease from store. @@ -1027,20 +1089,16 @@ func purgeOldVersions( // Acquire a refcount on the descriptor on the latest version to maintain an // active lease, so that it doesn't get released when removeInactives() // is called below. Release this lease after calling removeInactives(). - newest := m.findNewest(id) - if newest == nil || newest.hasExpired(m.storage.clock.Now()) { - return errRenewLease - } - newest.incRefcount() - removeInactives(false /* dropped */) - s, err := t.release(newest.Descriptor, m.removeOnceDereferenced()) - if err != nil { - return err - } - if s != nil { - releaseLease(s, m) + desc, _, err := t.findForTimestamp(ctx, m.storage.clock.Now()) + if isInactive := catalog.HasInactiveDescriptorError(err); err == nil || isInactive { + removeInactives(isInactive) + if desc != nil { + t.release(ctx, desc) + return nil + } + return nil } - return nil + return err } // maybeQueueLeaseRenewal queues a lease renewal if there is not already a lease @@ -1212,7 +1270,7 @@ func (c *nameCache) get( } // Expired descriptor. Don't hand it out. - if desc.hasExpired(timestamp) { + if desc.hasExpiredLocked(timestamp) { return nil } @@ -1233,7 +1291,8 @@ func (c *nameCache) insert(desc *descriptorVersionState) { // If we already have a lease in the cache for this name, see if this one is // better (higher version or later expiration). if desc.GetVersion() > existing.GetVersion() || - (desc.GetVersion() == existing.GetVersion() && desc.hasValidExpiration(existing)) { + (desc.GetVersion() == existing.GetVersion() && + existing.getExpiration().Less(desc.getExpiration())) { // Overwrite the old lease. The new one is better. From now on, we want // clients to use the new one. c.descriptors[key] = desc @@ -1320,21 +1379,17 @@ func NewLeaseManager( testingKnobs ManagerTestingKnobs, stopper *stop.Stopper, rangeFeedFactory *rangefeed.Factory, - cfg *base.LeaseManagerConfig, ) *Manager { lm := &Manager{ storage: storage{ - nodeIDContainer: nodeIDContainer, - db: db, - clock: clock, - internalExecutor: internalExecutor, - settings: settings, - codec: codec, - group: &singleflight.Group{}, - leaseDuration: cfg.DescriptorLeaseDuration, - leaseJitterFraction: cfg.DescriptorLeaseJitterFraction, - leaseRenewalTimeout: cfg.DescriptorLeaseRenewalTimeout, - testingKnobs: testingKnobs.LeaseStoreTestingKnobs, + nodeIDContainer: nodeIDContainer, + db: db, + clock: clock, + internalExecutor: internalExecutor, + settings: settings, + codec: codec, + group: &singleflight.Group{}, + testingKnobs: testingKnobs.LeaseStoreTestingKnobs, outstandingLeases: metric.NewGauge(metric.Metadata{ Name: "sql.leases.active", Help: "The number of outstanding SQL schema leases.", @@ -1401,54 +1456,50 @@ func (m *Manager) AcquireByName( parentID descpb.ID, parentSchemaID descpb.ID, name string, -) (catalog.Descriptor, hlc.Timestamp, error) { +) (LeasedDescriptor, error) { // When offline descriptor leases were not allowed to be cached, // attempt to acquire a lease on them would generate a descriptor // offline error. Recent changes allow offline descriptor leases // to be cached, but callers still need the offline error generated. // This logic will release the lease (the lease manager will still // cache it), and generate the offline descriptor error. - validateDescriptorForReturn := func(desc catalog.Descriptor, - expiration hlc.Timestamp) (catalog.Descriptor, hlc.Timestamp, error) { - if desc.Offline() { + validateDescriptorForReturn := func(desc LeasedDescriptor) (LeasedDescriptor, error) { + if desc.Desc().Offline() { if err := catalog.FilterDescriptorState( - desc, tree.CommonLookupFlags{}, + desc.Desc(), tree.CommonLookupFlags{}, ); err != nil { - releaseErr := m.Release(desc) - if releaseErr != nil { - log.Warningf(ctx, "error releasing lease: %s", releaseErr) - } - return nil, hlc.Timestamp{}, err + desc.Release(ctx) + return nil, err } } - return desc, expiration, nil + return desc, nil } - // Check if we have cached an ID for this name. descVersion := m.names.get(parentID, parentSchemaID, name, timestamp) if descVersion != nil { if descVersion.GetModificationTime().LessEq(timestamp) { + expiration := descVersion.getExpiration() // If this lease is nearly expired, ensure a renewal is queued. - durationUntilExpiry := time.Duration(descVersion.expiration.WallTime - timestamp.WallTime) - if durationUntilExpiry < m.storage.leaseRenewalTimeout { + durationUntilExpiry := time.Duration(expiration.WallTime - timestamp.WallTime) + if durationUntilExpiry < m.storage.leaseRenewalTimeout() { if t := m.findDescriptorState(descVersion.GetID(), false /* create */); t != nil { if err := t.maybeQueueLeaseRenewal( ctx, m, descVersion.GetID(), name); err != nil { - return nil, hlc.Timestamp{}, err + return nil, err } } } - return validateDescriptorForReturn(descVersion.Descriptor, descVersion.expiration) - } - if err := m.Release(descVersion); err != nil { - return nil, hlc.Timestamp{}, err + return validateDescriptorForReturn(descVersion) } + // m.names.get() incremented the refcount, we decrement it to get a new + // version. + descVersion.Release(ctx) // Return a valid descriptor for the timestamp. - desc, expiration, err := m.Acquire(ctx, timestamp, descVersion.GetID()) + leasedDesc, err := m.Acquire(ctx, timestamp, descVersion.GetID()) if err != nil { - return nil, hlc.Timestamp{}, err + return nil, err } - return validateDescriptorForReturn(desc, expiration) + return validateDescriptorForReturn(leasedDesc) } // We failed to find something in the cache, or what we found is not @@ -1458,13 +1509,13 @@ func (m *Manager) AcquireByName( var err error id, err := m.resolveName(ctx, timestamp, parentID, parentSchemaID, name) if err != nil { - return nil, hlc.Timestamp{}, err + return nil, err } - desc, expiration, err := m.Acquire(ctx, timestamp, id) + desc, err := m.Acquire(ctx, timestamp, id) if err != nil { - return nil, hlc.Timestamp{}, err + return nil, err } - if !NameMatchesDescriptor(desc, parentID, parentSchemaID, name) { + if !NameMatchesDescriptor(desc.Desc(), parentID, parentSchemaID, name) { // We resolved name `name`, but the lease has a different name in it. // That can mean two things. Assume the descriptor is being renamed from A to B. // a) `name` is A. The transaction doing the RENAME committed (so the @@ -1498,26 +1549,22 @@ func (m *Manager) AcquireByName( // // TODO(vivek): check if the entire above comment is indeed true. Review the // use of NameMatchesDescriptor() throughout this function. - if err := m.Release(desc); err != nil { - log.Warningf(ctx, "error releasing lease: %s", err) - } + desc.Release(ctx) if err := m.AcquireFreshestFromStore(ctx, id); err != nil { - return nil, hlc.Timestamp{}, err + return nil, err } - desc, expiration, err = m.Acquire(ctx, timestamp, id) + desc, err = m.Acquire(ctx, timestamp, id) if err != nil { - return nil, hlc.Timestamp{}, err + return nil, err } - if !NameMatchesDescriptor(desc, parentID, parentSchemaID, name) { + if !NameMatchesDescriptor(desc.Desc(), parentID, parentSchemaID, name) { // If the name we had doesn't match the newest descriptor in the DB, then // we're trying to use an old name. - if err := m.Release(desc); err != nil { - log.Warningf(ctx, "error releasing lease: %s", err) - } - return nil, hlc.Timestamp{}, catalog.ErrDescriptorNotFound + desc.Release(ctx) + return nil, catalog.ErrDescriptorNotFound } } - return validateDescriptorForReturn(desc, expiration) + return validateDescriptorForReturn(desc) } // resolveName resolves a descriptor name to a descriptor ID at a particular @@ -1560,6 +1607,14 @@ func (m *Manager) resolveName( return id, nil } +// LeasedDescriptor tracks and manages leasing related +// information for a descriptor. +type LeasedDescriptor interface { + Desc() catalog.Descriptor + Expiration() hlc.Timestamp + Release(ctx context.Context) +} + // Acquire acquires a read lease for the specified descriptor ID valid for // the timestamp. It returns the descriptor and a expiration time. // A transaction using this descriptor must ensure that its @@ -1573,21 +1628,21 @@ func (m *Manager) resolveName( // can be leased; as it stands a dropped descriptor cannot be leased. func (m *Manager) Acquire( ctx context.Context, timestamp hlc.Timestamp, id descpb.ID, -) (catalog.Descriptor, hlc.Timestamp, error) { +) (LeasedDescriptor, error) { for { t := m.findDescriptorState(id, true /*create*/) desc, latest, err := t.findForTimestamp(ctx, timestamp) if err == nil { // If the latest lease is nearly expired, ensure a renewal is queued. if latest { - durationUntilExpiry := time.Duration(desc.expiration.WallTime - timestamp.WallTime) - if durationUntilExpiry < m.storage.leaseRenewalTimeout { + durationUntilExpiry := time.Duration(desc.getExpiration().WallTime - timestamp.WallTime) + if durationUntilExpiry < m.storage.leaseRenewalTimeout() { if err := t.maybeQueueLeaseRenewal(ctx, m, id, desc.GetName()); err != nil { - return nil, hlc.Timestamp{}, err + return nil, err } } } - return desc.Descriptor, desc.expiration, nil + return desc, nil } switch { case errors.Is(err, errRenewLease): @@ -1598,7 +1653,7 @@ func (m *Manager) Acquire( _, errLease := acquireNodeLease(ctx, m, id) return errLease }(); err != nil { - return nil, hlc.Timestamp{}, err + return nil, err } if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil { @@ -1609,38 +1664,16 @@ func (m *Manager) Acquire( // Read old versions from the store. This can block while reading. versions, errRead := m.readOlderVersionForTimestamp(ctx, id, timestamp) if errRead != nil { - return nil, hlc.Timestamp{}, errRead + return nil, errRead } m.insertDescriptorVersions(id, versions) default: - return nil, hlc.Timestamp{}, err + return nil, err } } } -// Release releases a previously acquired lease. -func (m *Manager) Release(desc catalog.Descriptor) error { - t := m.findDescriptorState(desc.GetID(), false /* create */) - if t == nil { - return errors.Errorf("descriptor %d not found", desc.GetID()) - } - // TODO(pmattis): Can/should we delete from Manager.descriptors if the - // descriptorState becomes empty? - // TODO(andrei): I think we never delete from Manager.descriptors... which - // could be bad if a lot of descriptors keep being created. I looked into cleaning - // up a bit, but it seems tricky to do with the current locking which is split - // between Manager and descriptorState. - l, err := t.release(desc, m.removeOnceDereferenced()) - if err != nil { - return err - } - if l != nil { - releaseLease(l, m) - } - return nil -} - // removeOnceDereferenced returns true if the Manager thinks // a descriptorVersionState can be removed after its refcount goes to 0. func (m *Manager) removeOnceDereferenced() bool { @@ -1688,7 +1721,7 @@ func (m *Manager) findDescriptorState(id descpb.ID, create bool) *descriptorStat defer m.mu.Unlock() t := m.mu.descriptors[id] if t == nil && create { - t = &descriptorState{id: id, stopper: m.stopper} + t = &descriptorState{m: m, id: id, stopper: m.stopper} m.mu.descriptors[id] = t } return t @@ -1804,7 +1837,8 @@ var leaseRefreshLimit = settings.RegisterIntSetting( // TODO(vivek): Remove once epoch based table leases are implemented. func (m *Manager) PeriodicallyRefreshSomeLeases(ctx context.Context) { _ = m.stopper.RunAsyncTask(ctx, "lease-refresher", func(ctx context.Context) { - if m.storage.leaseDuration <= 0 { + leaseDuration := LeaseDuration.Get(&m.storage.settings.SV) + if leaseDuration <= 0 { return } refreshTimer := timeutil.NewTimer() @@ -1996,16 +2030,16 @@ func (m *Manager) VisitLeases( // context. func (m *Manager) TestingAcquireAndAssertMinVersion( ctx context.Context, timestamp hlc.Timestamp, id descpb.ID, minVersion descpb.DescriptorVersion, -) (catalog.Descriptor, hlc.Timestamp, error) { +) (LeasedDescriptor, error) { t := m.findDescriptorState(id, true) if err := ensureVersion(ctx, id, minVersion, m); err != nil { - return nil, hlc.Timestamp{}, err + return nil, err } desc, _, err := t.findForTimestamp(ctx, timestamp) if err != nil { - return nil, hlc.Timestamp{}, err + return nil, err } - return desc.Descriptor, desc.expiration, nil + return desc, nil } // TestingOutstandingLeasesGauge returns the outstanding leases gauge that is diff --git a/pkg/sql/catalog/lease/lease_internal_test.go b/pkg/sql/catalog/lease/lease_internal_test.go index e5225abc5d0d..8c76b57966d1 100644 --- a/pkg/sql/catalog/lease/lease_internal_test.go +++ b/pkg/sql/catalog/lease/lease_internal_test.go @@ -87,14 +87,14 @@ func TestTableSet(t *testing.T) { s := &descriptorVersionState{ Descriptor: tabledesc.NewBuilder(&descpb.TableDescriptor{Version: op.version}).BuildImmutable(), } - s.expiration = hlc.Timestamp{WallTime: op.expiration} + s.mu.expiration = hlc.Timestamp{WallTime: op.expiration} set.insert(s) case remove: s := &descriptorVersionState{ Descriptor: tabledesc.NewBuilder(&descpb.TableDescriptor{Version: op.version}).BuildImmutable(), } - s.expiration = hlc.Timestamp{WallTime: op.expiration} + s.mu.expiration = hlc.Timestamp{WallTime: op.expiration} set.remove(s) case newest: @@ -104,7 +104,7 @@ func TestTableSet(t *testing.T) { } s := "" if n != nil { - s = fmt.Sprintf("%d:%d", n.GetVersion(), n.expiration.WallTime) + s = fmt.Sprintf("%d:%d", n.GetVersion(), n.getExpiration().WallTime) } if d.expected != s { t.Fatalf("%d: expected %s, but found %s", i, d.expected, s) @@ -166,15 +166,13 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); if err := leaseManager.AcquireFreshestFromStore(context.Background(), tableDesc.GetID()); err != nil { t.Fatal(err) } - table, exp, err := leaseManager.Acquire(context.Background(), s.Clock().Now(), tableDesc.GetID()) + table, err := leaseManager.Acquire(context.Background(), s.Clock().Now(), tableDesc.GetID()) if err != nil { t.Fatal(err) } - tables = append(tables, table.(catalog.TableDescriptor)) - expiration = exp - if err := leaseManager.Release(table); err != nil { - t.Fatal(err) - } + tables = append(tables, table.Desc().(catalog.TableDescriptor)) + expiration = table.Expiration() + table.Release(context.Background()) } } getLeases() @@ -206,7 +204,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); ts.mu.Lock() correctLease := ts.mu.active.data[0].GetID() == tables[5].GetID() && ts.mu.active.data[0].GetVersion() == tables[5].GetVersion() - correctExpiration := ts.mu.active.data[0].expiration == expiration + correctExpiration := ts.mu.active.data[0].getExpiration() == expiration ts.mu.Unlock() if !correctLease { t.Fatalf("wrong lease survived purge") @@ -220,8 +218,8 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); ts.mu.Lock() tableVersion := &descriptorVersionState{ Descriptor: tables[0], - expiration: tables[5].GetModificationTime(), } + tableVersion.mu.expiration = tables[5].GetModificationTime() ts.mu.active.insert(tableVersion) ts.mu.Unlock() if numLeases := getNumVersions(ts); numLeases != 2 { @@ -284,14 +282,12 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); if err := leaseManager.AcquireFreshestFromStore(ctx, tableDesc.GetID()); err != nil { t.Fatal(err) } - table, _, err := leaseManager.Acquire(ctx, futureTime, tableDesc.GetID()) + table, err := leaseManager.Acquire(ctx, futureTime, tableDesc.GetID()) if err != nil { t.Fatal(err) } - latestDesc := table.(catalog.TableDescriptor) - if err := leaseManager.Release(table); err != nil { - t.Fatal(err) - } + latestDesc := table.Desc().(catalog.TableDescriptor) + table.Release(ctx) return latestDesc } origDesc := getLatestDesc() @@ -434,9 +430,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); if lease.GetID() != tableDesc.GetID() { t.Fatalf("new name has wrong ID: %d (expected: %d)", lease.GetID(), tableDesc.GetID()) } - if err := leaseManager.Release(lease.Descriptor); err != nil { - t.Fatal(err) - } + lease.Release(context.Background()) // Rename to a different database. if _, err := db.Exec("ALTER TABLE t.test2 RENAME TO t1.test2;"); err != nil { @@ -461,9 +455,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); if lease.GetID() != tableDesc.GetID() { t.Fatalf("new name has wrong ID: %d (expected: %d)", lease.GetID(), tableDesc.GetID()) } - if err := leaseManager.Release(lease.Descriptor); err != nil { - t.Fatal(err) - } + lease.Release(context.Background()) } // Tests that a name cache entry with by an expired lease is not returned. @@ -494,9 +486,7 @@ CREATE TABLE t.%s (k CHAR PRIMARY KEY, v CHAR); if lease := leaseManager.names.get(tableDesc.GetParentID(), tableDesc.GetParentSchemaID(), tableName, s.Clock().Now()); lease == nil { t.Fatalf("name cache has no unexpired entry for (%d, %s)", tableDesc.GetParentID(), tableName) } else { - if err := leaseManager.Release(lease.Descriptor); err != nil { - t.Fatal(err) - } + lease.Release(context.Background()) } leaseManager.ExpireLeases(s.Clock()) @@ -544,7 +534,7 @@ CREATE TABLE t.%s (k CHAR PRIMARY KEY, v CHAR); if lease == nil { t.Fatalf("name cache has no unexpired entry for (%d, %s)", tableDesc.GetParentID(), tableName) } - + expiration := lease.Expiration() tracker := removalTracker.TrackRemoval(lease.Descriptor) // Acquire another lease. @@ -557,22 +547,19 @@ CREATE TABLE t.%s (k CHAR PRIMARY KEY, v CHAR); if newLease == nil { t.Fatalf("name cache doesn't contain entry for (%d, %s)", tableDesc.GetParentID(), tableName) } - if newLease == lease { - t.Fatalf("same lease %s", newLease.expiration.GoTime()) + if newLease.Expiration() == expiration { + t.Fatalf("same lease %s %s", expiration.GoTime(), newLease.Expiration().GoTime()) } - if err := leaseManager.Release(lease.Descriptor); err != nil { - t.Fatal(err) - } + // TODO(ajwerner): does this matter? + lease.Release(context.Background()) // The first lease acquisition was released. if err := tracker.WaitForRemoval(); err != nil { t.Fatal(err) } - if err := leaseManager.Release(lease.Descriptor); err != nil { - t.Fatal(err) - } + newLease.Release(context.Background()) } // Test that table names are treated as case sensitive by the name cache. @@ -634,7 +621,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); // Populate the name cache. ctx := context.Background() - table, _, err := leaseManager.AcquireByName( + table, err := leaseManager.AcquireByName( ctx, leaseManager.storage.clock.Now(), tableDesc.GetParentID(), @@ -644,27 +631,26 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); if err != nil { t.Fatal(err) } - if err := leaseManager.Release(table); err != nil { - t.Fatal(err) - } + table.Release(ctx) // Try to trigger the race repeatedly: race an AcquireByName against a // Release. // tableChan acts as a barrier, synchronizing the two routines at every // iteration. - tableChan := make(chan catalog.TableDescriptor) - errChan := make(chan error) + tableChan := make(chan LeasedDescriptor) + releaseChan := make(chan struct{}) go func() { for table := range tableChan { // Move errors to the main goroutine. - errChan <- leaseManager.Release(table) + table.Release(ctx) + releaseChan <- struct{}{} } }() - for i := 0; i < 50; i++ { + for i := 0; i < 1; i++ { timestamp := leaseManager.storage.clock.Now() ctx := context.Background() - desc, _, err := leaseManager.AcquireByName( + desc, err := leaseManager.AcquireByName( ctx, timestamp, tableDesc.GetParentID(), @@ -674,7 +660,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); if err != nil { t.Fatal(err) } - table := desc.(catalog.TableDescriptor) + table := desc.Desc().(catalog.TableDescriptor) // This test will need to wait until leases are removed from the store // before creating new leases because the jitter used in the leases' // expiration causes duplicate key errors when trying to create new @@ -685,8 +671,8 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); tracker := removalTracker.TrackRemoval(table) // Start the race: signal the other guy to release, and we do another // acquire at the same time. - tableChan <- table - tableByName, _, err := leaseManager.AcquireByName( + tableChan <- desc + tableByName, err := leaseManager.AcquireByName( ctx, timestamp, tableDesc.GetParentID(), @@ -698,15 +684,10 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); } // See if there was an error releasing lease. - err = <-errChan - if err != nil { - t.Fatal(err) - } + <-releaseChan // Release the lease for the last time. - if err := leaseManager.Release(tableByName); err != nil { - t.Fatal(err) - } + tableByName.Release(ctx) // There are 2 possible results of the race above: Either we acquired before // releasing (causing us to acquire the same lease, incrementing and then @@ -753,13 +734,11 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); if err := leaseManager.AcquireFreshestFromStore(context.Background(), tableDesc.GetID()); err != nil { t.Error(err) } - table, _, err := leaseManager.Acquire(context.Background(), s.Clock().Now(), tableDesc.GetID()) + table, err := leaseManager.Acquire(context.Background(), s.Clock().Now(), tableDesc.GetID()) if err != nil { t.Error(err) } - if err := leaseManager.Release(table); err != nil { - t.Error(err) - } + table.Release(context.Background()) }() } wg.Wait() @@ -803,13 +782,11 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); for i := 0; i < numRoutines; i++ { go func() { defer wg.Done() - table, _, err := leaseManager.Acquire(context.Background(), now, tableDesc.GetID()) + table, err := leaseManager.Acquire(context.Background(), now, tableDesc.GetID()) if err != nil { t.Error(err) } - if err := leaseManager.Release(table); err != nil { - t.Error(err) - } + table.Release(context.Background()) }() } @@ -843,10 +820,18 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) { // Result is a struct for moving results to the main result routine. type Result struct { - table catalog.TableDescriptor + table LeasedDescriptor exp hlc.Timestamp err error } + mkResult := func(table LeasedDescriptor, err error) Result { + res := Result{err: err} + if table != nil { + res.table = table + res.exp = table.Expiration() + } + return res + } descID := descpb.ID(keys.LeaseTableID) @@ -856,8 +841,7 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) { m *Manager, acquireChan chan Result, ) { - table, e, err := m.Acquire(ctx, m.storage.clock.Now(), descID) - acquireChan <- Result{err: err, exp: e, table: table.(catalog.TableDescriptor)} + acquireChan <- mkResult(m.Acquire(ctx, m.storage.clock.Now(), descID)) } testCases := []struct { @@ -936,12 +920,11 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) { serverArgs := base.TestServerArgs{Knobs: testingKnobs} - serverArgs.LeaseManagerConfig = base.NewLeaseManagerConfig() // The LeaseJitterFraction is zero so leases will have // monotonically increasing expiration. This prevents two leases // from having the same expiration due to randomness, as the // leases are checked for having a different expiration. - serverArgs.LeaseManagerConfig.DescriptorLeaseJitterFraction = 0.0 + LeaseJitterFraction.Override(&serverArgs.SV, 0) s, _, _ := serverutils.StartServer( t, serverArgs) @@ -956,11 +939,10 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) { if test.isSecondCallAcquireFreshest { go func(ctx context.Context, m *Manager, acquireChan chan Result) { if err := m.AcquireFreshestFromStore(ctx, descID); err != nil { - acquireChan <- Result{err: err, exp: hlc.Timestamp{}, table: nil} + acquireChan <- mkResult(nil, err) return } - table, e, err := m.Acquire(ctx, s.Clock().Now(), descID) - acquireChan <- Result{err: err, exp: e, table: table.(catalog.TableDescriptor)} + acquireChan <- mkResult(m.Acquire(ctx, s.Clock().Now(), descID)) }(ctx, leaseManager, acquireResultChan) } else { @@ -986,10 +968,8 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) { // Release the lease. This also causes it to get removed as the // knob RemoveOnceDereferenced is set. - tracker := removalTracker.TrackRemoval(result1.table) - if err := leaseManager.Release(result1.table); err != nil { - t.Fatal(err) - } + tracker := removalTracker.TrackRemoval(result1.table.Desc()) + result1.table.Release(ctx) // Wait until the lease is removed. if err := tracker.WaitForRemoval(); err != nil { t.Fatal(err) diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index 47a6cdfd8d9c..3a9d4c550256 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" @@ -74,13 +75,9 @@ type leaseTest struct { kvDB *kv.DB nodes map[uint32]*lease.Manager leaseManagerTestingKnobs lease.ManagerTestingKnobs - cfg *base.LeaseManagerConfig } func newLeaseTest(tb testing.TB, params base.TestServerArgs) *leaseTest { - if params.LeaseManagerConfig == nil { - params.LeaseManagerConfig = base.NewLeaseManagerConfig() - } s, db, kvDB := serverutils.StartServer(tb, params) leaseTest := &leaseTest{ TB: tb, @@ -88,7 +85,6 @@ func newLeaseTest(tb testing.TB, params base.TestServerArgs) *leaseTest { db: db, kvDB: kvDB, nodes: map[uint32]*lease.Manager{}, - cfg: params.LeaseManagerConfig, } if params.Knobs.SQLLeaseManager != nil { leaseTest.leaseManagerTestingKnobs = @@ -138,41 +134,39 @@ func (t *leaseTest) expectLeases(descID descpb.ID, expected string) { }) } -func (t *leaseTest) acquire( - nodeID uint32, descID descpb.ID, -) (catalog.Descriptor, hlc.Timestamp, error) { +func (t *leaseTest) acquire(nodeID uint32, descID descpb.ID) (lease.LeasedDescriptor, error) { return t.node(nodeID).Acquire(context.Background(), t.server.Clock().Now(), descID) } func (t *leaseTest) acquireMinVersion( nodeID uint32, descID descpb.ID, minVersion descpb.DescriptorVersion, -) (catalog.Descriptor, hlc.Timestamp, error) { +) (lease.LeasedDescriptor, error) { return t.node(nodeID).TestingAcquireAndAssertMinVersion( context.Background(), t.server.Clock().Now(), descID, minVersion) + } -func (t *leaseTest) mustAcquire( - nodeID uint32, descID descpb.ID, -) (catalog.Descriptor, hlc.Timestamp) { - table, expiration, err := t.acquire(nodeID, descID) +func (t *leaseTest) mustAcquire(nodeID uint32, descID descpb.ID) lease.LeasedDescriptor { + ld, err := t.acquire(nodeID, descID) if err != nil { t.Fatal(err) } - return table, expiration + return ld } func (t *leaseTest) mustAcquireMinVersion( nodeID uint32, descID descpb.ID, minVersion descpb.DescriptorVersion, -) (catalog.Descriptor, hlc.Timestamp) { - desc, expiration, err := t.acquireMinVersion(nodeID, descID, minVersion) +) lease.LeasedDescriptor { + desc, err := t.acquireMinVersion(nodeID, descID, minVersion) if err != nil { t.Fatal(err) } - return desc, expiration + return desc } -func (t *leaseTest) release(nodeID uint32, desc catalog.Descriptor) error { - return t.node(nodeID).Release(desc) +func (t *leaseTest) release(nodeID uint32, desc lease.LeasedDescriptor) error { + desc.Release(context.Background()) + return nil } // If leaseRemovalTracker is not nil, it will be used to block until the lease is @@ -180,15 +174,13 @@ func (t *leaseTest) release(nodeID uint32, desc catalog.Descriptor) error { // store (i.e. it's not expired and it's not for an old descriptor version), // this shouldn't be set. func (t *leaseTest) mustRelease( - nodeID uint32, desc catalog.Descriptor, leaseRemovalTracker *lease.LeaseRemovalTracker, + nodeID uint32, desc lease.LeasedDescriptor, leaseRemovalTracker *lease.LeaseRemovalTracker, ) { var tracker lease.RemovalTracker if leaseRemovalTracker != nil { - tracker = leaseRemovalTracker.TrackRemoval(desc) - } - if err := t.release(nodeID, desc); err != nil { - t.Fatal(err) + tracker = leaseRemovalTracker.TrackRemoval(desc.Desc()) } + desc.Release(context.Background()) if leaseRemovalTracker != nil { if err := tracker.WaitForRemoval(); err != nil { t.Fatal(err) @@ -233,7 +225,6 @@ func (t *leaseTest) node(nodeID uint32) *lease.Manager { t.leaseManagerTestingKnobs, t.server.Stopper(), cfgCpy.RangeFeedFactory, - t.cfg, ) ctx := logtags.AddTag(context.Background(), "leasemgr", nodeID) mgr.PeriodicallyRefreshSomeLeases(ctx) @@ -245,7 +236,7 @@ func (t *leaseTest) node(nodeID uint32) *lease.Manager { func TestLeaseManager(testingT *testing.T) { defer leaktest.AfterTest(testingT)() removalTracker := lease.NewLeaseRemovalTracker() - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() params.Knobs = base.TestingKnobs{ SQLLeaseManager: &lease.ManagerTestingKnobs{ LeaseStoreTestingKnobs: lease.StorageTestingKnobs{ @@ -261,23 +252,19 @@ func TestLeaseManager(testingT *testing.T) { // We can't acquire a lease on a non-existent table. expected := "descriptor not found" - if _, _, err := t.acquire(1, 10000); !testutils.IsError(err, expected) { + if _, err := t.acquire(1, 10000); !testutils.IsError(err, expected) { t.Fatalf("expected %s, but found %v", expected, err) } // Acquire 2 leases from the same node. They should return the same // table and expiration. - l1, e1 := t.mustAcquire(1, descID) - l2, e2 := t.mustAcquire(1, descID) - if l1.GetID() != l2.GetID() { + l1 := t.mustAcquire(1, descID) + l2 := t.mustAcquire(1, descID) + if l1.Desc().GetID() != l2.Desc().GetID() { t.Fatalf("expected same lease, but found %v != %v", l1, l2) - } else if e1 != e2 { + } else if e1, e2 := l1.Expiration(), l2.Expiration(); e1 != e2 { t.Fatalf("expected same lease timestamps, but found %v != %v", e1, e2) } t.expectLeases(descID, "/1/1") - // Node 2 never acquired a lease on descID, so we should expect an error. - if err := t.release(2, l1); err == nil { - t.Fatalf("expected error, but found none") - } t.mustRelease(1, l1, nil) t.mustRelease(1, l2, nil) t.expectLeases(descID, "/1/1") @@ -285,15 +272,15 @@ func TestLeaseManager(testingT *testing.T) { // It is an error to acquire a lease for a specific version that doesn't // exist yet. expected = "version 2 for descriptor lease does not exist yet" - if _, _, err := t.acquireMinVersion(1, descID, 2); !testutils.IsError(err, expected) { + if _, err := t.acquireMinVersion(1, descID, 2); !testutils.IsError(err, expected) { t.Fatalf("expected %s, but found %v", expected, err) } t.expectLeases(descID, "/1/1") // Publish a new version and explicitly acquire it. - l2, _ = t.mustAcquire(1, descID) + l2 = t.mustAcquire(1, descID) t.mustPublish(ctx, 1, descID) - l3, _ := t.mustAcquireMinVersion(1, descID, 2) + l3 := t.mustAcquireMinVersion(1, descID, 2) t.expectLeases(descID, "/1/1 /2/1") // When the last local reference on the new version is released we don't @@ -303,7 +290,7 @@ func TestLeaseManager(testingT *testing.T) { // We can still acquire a local reference on the old version since it hasn't // expired. - l4, _ := t.mustAcquireMinVersion(1, descID, 1) + l4 := t.mustAcquireMinVersion(1, descID, 1) t.mustRelease(1, l4, nil) t.expectLeases(descID, "/1/1 /2/1") @@ -313,8 +300,8 @@ func TestLeaseManager(testingT *testing.T) { t.expectLeases(descID, "/2/1") // Acquire 2 node leases on version 2. - l5, _ := t.mustAcquireMinVersion(1, descID, 2) - l6, _ := t.mustAcquireMinVersion(2, descID, 2) + l5 := t.mustAcquireMinVersion(1, descID, 2) + l6 := t.mustAcquireMinVersion(2, descID, 2) // Publish version 3. This will succeed immediately. t.mustPublish(ctx, 3, descID) @@ -328,8 +315,8 @@ func TestLeaseManager(testingT *testing.T) { }() // Force both nodes ahead to version 3. - l7, _ := t.mustAcquireMinVersion(1, descID, 3) - l8, _ := t.mustAcquireMinVersion(2, descID, 3) + l7 := t.mustAcquireMinVersion(1, descID, 3) + l8 := t.mustAcquireMinVersion(2, descID, 3) t.expectLeases(descID, "/2/1 /2/2 /3/1 /3/2") t.mustRelease(1, l5, removalTracker) @@ -339,7 +326,7 @@ func TestLeaseManager(testingT *testing.T) { // Wait for version 4 to be published. wg.Wait() - l9, _ := t.mustAcquireMinVersion(1, descID, 4) + l9 := t.mustAcquireMinVersion(1, descID, 4) t.mustRelease(1, l7, removalTracker) t.mustRelease(2, l8, nil) t.expectLeases(descID, "/3/2 /4/1") @@ -347,14 +334,19 @@ func TestLeaseManager(testingT *testing.T) { t.expectLeases(descID, "/3/2 /4/1") } +func createTestServerParams() base.TestServerArgs { + params, _ := tests.CreateTestServerParams() + params.Settings = cluster.MakeTestingClusterSettings() + return params +} + func TestLeaseManagerReacquire(testingT *testing.T) { defer leaktest.AfterTest(testingT)() - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() - params.LeaseManagerConfig = base.NewLeaseManagerConfig() // Set the lease duration such that the next lease acquisition will // require the lease to be reacquired. - params.LeaseManagerConfig.DescriptorLeaseDuration = 0 + lease.LeaseDuration.Override(¶ms.SV, 0) removalTracker := lease.NewLeaseRemovalTracker() params.Knobs = base.TestingKnobs{ @@ -369,13 +361,15 @@ func TestLeaseManagerReacquire(testingT *testing.T) { const descID = keys.LeaseTableID - l1, e1 := t.mustAcquire(1, descID) + l1 := t.mustAcquire(1, descID) t.expectLeases(descID, "/1/1") + e1 := l1.Expiration() // Another lease acquisition from the same node will result in a new lease. - rt := removalTracker.TrackRemoval(l1) - l3, e3 := t.mustAcquire(1, descID) - if l1.GetID() == l3.GetID() && e3.WallTime == e1.WallTime { + rt := removalTracker.TrackRemoval(l1.Desc()) + l3 := t.mustAcquire(1, descID) + e3 := l3.Expiration() + if l1.Desc().GetID() == l3.Desc().GetID() && e3.WallTime == e1.WallTime { t.Fatalf("expected different leases, but found %v", l1) } if e3.WallTime < e1.WallTime { @@ -395,7 +389,7 @@ func TestLeaseManagerReacquire(testingT *testing.T) { func TestLeaseManagerPublishVersionChanged(testingT *testing.T) { defer leaktest.AfterTest(testingT)() - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() t := newLeaseTest(testingT, params) defer t.cleanup() @@ -458,7 +452,7 @@ func TestLeaseManagerPublishVersionChanged(testingT *testing.T) { func TestLeaseManagerPublishIllegalVersionChange(testingT *testing.T) { defer leaktest.AfterTest(testingT)() - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() t := newLeaseTest(testingT, params) defer t.cleanup() @@ -482,7 +476,7 @@ func TestLeaseManagerPublishIllegalVersionChange(testingT *testing.T) { func TestLeaseManagerDrain(testingT *testing.T) { defer leaktest.AfterTest(testingT)() - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() leaseRemovalTracker := lease.NewLeaseRemovalTracker() params.Knobs = base.TestingKnobs{ SQLLeaseManager: &lease.ManagerTestingKnobs{ @@ -497,20 +491,20 @@ func TestLeaseManagerDrain(testingT *testing.T) { const descID = keys.LeaseTableID { - l1, _ := t.mustAcquire(1, descID) - l2, _ := t.mustAcquire(2, descID) + l1 := t.mustAcquire(1, descID) + l2 := t.mustAcquire(2, descID) t.mustRelease(1, l1, nil) t.expectLeases(descID, "/1/1 /1/2") // Removal tracker to track for node 1's lease removal once the node // starts draining. - l1RemovalTracker := leaseRemovalTracker.TrackRemoval(l1) + l1RemovalTracker := leaseRemovalTracker.TrackRemoval(l1.Desc()) t.nodes[1].SetDraining(true, nil /* reporter */) t.nodes[2].SetDraining(true, nil /* reporter */) // Leases cannot be acquired when in draining mode. - if _, _, err := t.acquire(1, descID); !testutils.IsError(err, "cannot acquire lease when draining") { + if _, err := t.acquire(1, descID); !testutils.IsError(err, "cannot acquire lease when draining") { t.Fatalf("unexpected error: %v", err) } @@ -531,7 +525,7 @@ func TestLeaseManagerDrain(testingT *testing.T) { // Check that leases with a refcount of 0 are correctly kept in the // store once the drain mode has been exited. t.nodes[1].SetDraining(false, nil /* reporter */) - l1, _ := t.mustAcquire(1, descID) + l1 := t.mustAcquire(1, descID) t.mustRelease(1, l1, nil) t.expectLeases(descID, "/1/1") } @@ -544,7 +538,7 @@ func TestCantLeaseDeletedTable(testingT *testing.T) { var mu syncutil.Mutex clearSchemaChangers := false - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ SchemaChangeJobNoOp: func() bool { @@ -585,7 +579,7 @@ CREATE TABLE test.t(a INT PRIMARY KEY); tableDesc := catalogkv.TestingGetTableDescriptor(t.kvDB, keys.SystemSQLCodec, "test", "t") // try to acquire at a bogus version to make sure we don't get back a lease we // already had. - _, _, err = t.acquireMinVersion(1, tableDesc.GetID(), tableDesc.GetVersion()+1) + _, err = t.acquireMinVersion(1, tableDesc.GetID(), tableDesc.GetVersion()+1) if !testutils.IsError(err, "descriptor is being dropped") { t.Fatalf("got a different error than expected: %v", err) } @@ -593,7 +587,7 @@ CREATE TABLE test.t(a INT PRIMARY KEY); func acquire( ctx context.Context, s *server.TestServer, descID descpb.ID, -) (catalog.Descriptor, hlc.Timestamp, error) { +) (lease.LeasedDescriptor, error) { return s.LeaseManager().(*lease.Manager).Acquire(ctx, s.Clock().Now(), descID) } @@ -609,7 +603,7 @@ func TestLeasesOnDeletedTableAreReleasedImmediately(t *testing.T) { var waitTableID descpb.ID deleted := make(chan bool) - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() params.Knobs = base.TestingKnobs{ SQLLeaseManager: &lease.ManagerTestingKnobs{ TestingDescriptorRefreshedEvent: func(descriptor *descpb.Descriptor) { @@ -649,11 +643,11 @@ CREATE TABLE test.t(a INT PRIMARY KEY); tableDesc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "test", "t") ctx := context.Background() - lease1, _, err := acquire(ctx, s.(*server.TestServer), tableDesc.GetID()) + lease1, err := acquire(ctx, s.(*server.TestServer), tableDesc.GetID()) if err != nil { t.Fatal(err) } - lease2, _, err := acquire(ctx, s.(*server.TestServer), tableDesc.GetID()) + lease2, err := acquire(ctx, s.(*server.TestServer), tableDesc.GetID()) if err != nil { t.Fatal(err) } @@ -676,23 +670,18 @@ CREATE TABLE test.t(a INT PRIMARY KEY); <-deleted // We should still be able to acquire, because we have an active lease. - lease3, _, err := acquire(ctx, s.(*server.TestServer), tableDesc.GetID()) + lease3, err := acquire(ctx, s.(*server.TestServer), tableDesc.GetID()) if err != nil { t.Fatal(err) } // Release everything. - if err := s.LeaseManager().(*lease.Manager).Release(lease1); err != nil { - t.Fatal(err) - } - if err := s.LeaseManager().(*lease.Manager).Release(lease2); err != nil { - t.Fatal(err) - } - if err := s.LeaseManager().(*lease.Manager).Release(lease3); err != nil { - t.Fatal(err) - } + lease1.Release(ctx) + lease2.Release(ctx) + lease3.Release(ctx) + // Now we shouldn't be able to acquire any more. - _, _, err = acquire(ctx, s.(*server.TestServer), tableDesc.GetID()) + _, err = acquire(ctx, s.(*server.TestServer), tableDesc.GetID()) if !testutils.IsError(err, "descriptor is being dropped") { t.Fatalf("got a different error than expected: %v", err) } @@ -702,7 +691,7 @@ CREATE TABLE test.t(a INT PRIMARY KEY); // properly tracked and released. func TestSubqueryLeases(t *testing.T) { defer leaktest.AfterTest(t)() - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() fooRelease := make(chan struct{}, 10) fooAcquiredCount := int32(0) @@ -782,7 +771,7 @@ SELECT EXISTS(SELECT * FROM t.foo); // Test that an AS OF SYSTEM TIME query uses the table cache. func TestAsOfSystemTimeUsesCache(t *testing.T) { defer leaktest.AfterTest(t)() - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() fooAcquiredCount := int32(0) @@ -836,7 +825,7 @@ func TestDescriptorRefreshOnRetry(t *testing.T) { skip.WithIssue(t, 50037) - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() fooAcquiredCount := int32(0) fooReleaseCount := int32(0) @@ -925,7 +914,7 @@ CREATE TABLE t.foo (v INT); // table descriptor. func TestTxnObeysTableModificationTime(t *testing.T) { defer leaktest.AfterTest(t)() - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() s, sqlDB, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.Background()) @@ -1140,7 +1129,7 @@ INSERT INTO t.kv VALUES ('a', 'b'); // version of a descriptor. func TestLeaseAtLatestVersion(t *testing.T) { defer leaktest.AfterTest(t)() - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() errChan := make(chan error, 1) params.Knobs = base.TestingKnobs{ SQLLeaseManager: &lease.ManagerTestingKnobs{ @@ -1220,7 +1209,7 @@ INSERT INTO t.timestamp VALUES ('a', 'b'); // parallelism, which is important to also benchmark locking. func BenchmarkLeaseAcquireByNameCached(b *testing.B) { defer leaktest.AfterTest(b)() - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() t := newLeaseTest(b, params) defer t.cleanup() @@ -1238,7 +1227,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); leaseManager := t.node(1) // Acquire the lease so it is put into the nameCache. - _, _, err := leaseManager.AcquireByName( + _, err := leaseManager.AcquireByName( context.Background(), t.server.Clock().Now(), dbID, @@ -1253,7 +1242,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); b.RunParallel(func(pb *testing.PB) { for pb.Next() { - _, _, err := leaseManager.AcquireByName( + _, err := leaseManager.AcquireByName( context.Background(), t.server.Clock().Now(), dbID, @@ -1278,13 +1267,16 @@ func TestLeaseRenewedAutomatically(testingT *testing.T) { var testAcquiredCount int32 var testAcquisitionBlockCount int32 - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() params.Knobs = base.TestingKnobs{ SQLLeaseManager: &lease.ManagerTestingKnobs{ LeaseStoreTestingKnobs: lease.StorageTestingKnobs{ // We want to track when leases get acquired and when they are renewed. // We also want to know when acquiring blocks to test lease renewal. - LeaseAcquiredEvent: func(desc catalog.Descriptor, _ error) { + LeaseAcquiredEvent: func(desc catalog.Descriptor, err error) { + if err != nil { + return + } if desc.GetID() > keys.MaxReservedDescID { atomic.AddInt32(&testAcquiredCount, 1) } @@ -1295,14 +1287,13 @@ func TestLeaseRenewedAutomatically(testingT *testing.T) { }, }, } - params.LeaseManagerConfig = base.NewLeaseManagerConfig() // The lease jitter is set to ensure newer leases have higher // expiration timestamps. - params.LeaseManagerConfig.DescriptorLeaseJitterFraction = 0.0 + lease.LeaseJitterFraction.Override(¶ms.SV, 0) // The renewal timeout is set to be the duration, so background // renewal should begin immediately after accessing a lease. - params.LeaseManagerConfig.DescriptorLeaseRenewalTimeout = - params.LeaseManagerConfig.DescriptorLeaseDuration + lease.LeaseRenewalDuration.Override(¶ms.SV, + lease.LeaseDuration.Get(¶ms.SV)) ctx := context.Background() t := newLeaseTest(testingT, params) @@ -1321,13 +1312,14 @@ CREATE TABLE t.test2 (); dbID := test2Desc.GetParentID() // Acquire a lease on test1 by name. - ts1, eo1, err := t.node(1).AcquireByName( + ts1, err := t.node(1).AcquireByName( ctx, t.server.Clock().Now(), dbID, test1Desc.GetParentSchemaID(), "test1", ) + eo1 := ts1.Expiration() if err != nil { t.Fatal(err) } else if err := t.release(1, ts1); err != nil { @@ -1338,7 +1330,7 @@ CREATE TABLE t.test2 (); } // Acquire a lease on test2 by ID. - ts2, eo2, err := t.node(1).Acquire(ctx, t.server.Clock().Now(), test2Desc.GetID()) + ts2, err := t.node(1).Acquire(ctx, t.server.Clock().Now(), test2Desc.GetID()) if err != nil { t.Fatal(err) } else if err := t.release(1, ts2); err != nil { @@ -1347,6 +1339,7 @@ CREATE TABLE t.test2 (); t.Fatalf("expected 2 leases to be acquired, but acquired %d times", count) } + eo2 := ts2.Expiration() // Reset testAcquisitionBlockCount as the first acqusition will always block. atomic.StoreInt32(&testAcquisitionBlockCount, 0) @@ -1355,7 +1348,7 @@ CREATE TABLE t.test2 (); // Acquire another lease by name on test1. At first this will be the // same lease, but eventually we will asynchronously renew a lease and // our acquire will get a newer lease. - ts1, en1, err := t.node(1).AcquireByName( + ts1, err := t.node(1).AcquireByName( ctx, t.server.Clock().Now(), dbID, @@ -1365,6 +1358,7 @@ CREATE TABLE t.test2 (); if err != nil { t.Fatal(err) } + en1 := ts1.Expiration() defer func() { if err := t.release(1, ts1); err != nil { t.Fatal(err) @@ -1387,10 +1381,11 @@ CREATE TABLE t.test2 (); // Acquire another lease by ID on test2. At first this will be the same // lease, but eventually we will asynchronously renew a lease and our // acquire will get a newer lease. - ts2, en2, err := t.node(1).Acquire(ctx, t.server.Clock().Now(), test2Desc.GetID()) + ts2, err := t.node(1).Acquire(ctx, t.server.Clock().Now(), test2Desc.GetID()) if err != nil { t.Fatal(err) } + en2 := ts2.Expiration() defer func() { if err := t.release(1, ts2); err != nil { t.Fatal(err) @@ -1424,7 +1419,7 @@ CREATE TABLE t.test2 (); func TestIncrementTableVersion(t *testing.T) { defer leaktest.AfterTest(t)() var violations int64 - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() params.Knobs = base.TestingKnobs{ // Disable execution of schema changers after the schema change // transaction commits. This is to prevent executing the default @@ -1526,7 +1521,7 @@ CREATE TABLE t.kv (k CHAR PRIMARY KEY, v CHAR); func TestTwoVersionInvariantRetryError(t *testing.T) { defer leaktest.AfterTest(t)() var violations int64 - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() params.Knobs = base.TestingKnobs{ // Disable execution of schema changers after the schema change // transaction commits. This is to prevent executing the default @@ -1637,7 +1632,7 @@ func TestModificationTimeTxnOrdering(testingT *testing.T) { // Which table to exercise the test against. const descID = keys.LeaseTableID - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() t := newLeaseTest(testingT, params) defer t.cleanup() @@ -1724,7 +1719,7 @@ func TestLeaseRenewedPeriodically(testingT *testing.T) { var testAcquiredCount int32 var testAcquisitionBlockCount int32 - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() params.Knobs = base.TestingKnobs{ SQLLeaseManager: &lease.ManagerTestingKnobs{ LeaseStoreTestingKnobs: lease.StorageTestingKnobs{ @@ -1752,15 +1747,15 @@ func TestLeaseRenewedPeriodically(testingT *testing.T) { }, }, } - params.LeaseManagerConfig = base.NewLeaseManagerConfig() + // The lease jitter is set to ensure newer leases have higher // expiration timestamps. - params.LeaseManagerConfig.DescriptorLeaseJitterFraction = 0.0 + lease.LeaseJitterFraction.Override(¶ms.SV, 0) // Lease duration to something small. - params.LeaseManagerConfig.DescriptorLeaseDuration = 50 * time.Millisecond + lease.LeaseDuration.Override(¶ms.SV, 50*time.Millisecond) // Renewal timeout to 0 saying that the lease will get renewed only // after the lease expires when a request requests the descriptor. - params.LeaseManagerConfig.DescriptorLeaseRenewalTimeout = 0 + lease.LeaseRenewalDuration.Override(¶ms.SV, 0) ctx := context.Background() t := newLeaseTest(testingT, params) @@ -1790,7 +1785,7 @@ CREATE TABLE t.test2 (); } // Acquire a lease on test1 by name. - ts1, _, err := t.node(1).AcquireByName( + ts1, err := t.node(1).AcquireByName( ctx, t.server.Clock().Now(), dbID, @@ -1807,7 +1802,7 @@ CREATE TABLE t.test2 (); } // Acquire a lease on test2 by ID. - ts2, _, err := t.node(1).Acquire(ctx, t.server.Clock().Now(), test2Desc.GetID()) + ts2, err := t.node(1).Acquire(ctx, t.server.Clock().Now(), test2Desc.GetID()) if err != nil { t.Fatal(err) } else if err := t.release(1, ts2); err != nil { @@ -1845,7 +1840,7 @@ CREATE TABLE t.test2 (); // initiated before the table is dropped succeeds. func TestReadBeforeDrop(t *testing.T) { defer leaktest.AfterTest(t)() - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() s, sqlDB, _ := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.Background()) @@ -1893,10 +1888,8 @@ INSERT INTO t.kv VALUES ('a', 'b'); // of a TABLE CREATE are pushed to allow them to observe the created table. func TestTableCreationPushesTxnsInRecentPast(t *testing.T) { defer leaktest.AfterTest(t)() - params, _ := tests.CreateTestServerParams() tc := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, - ServerArgs: params, }) defer tc.Stopper().Stop(context.Background()) sqlDB := tc.ServerConn(0) @@ -1954,7 +1947,7 @@ INSERT INTO t.kv VALUES ('c', 'd'); func TestDeleteOrphanedLeases(testingT *testing.T) { defer leaktest.AfterTest(testingT)() - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() params.Knobs = base.TestingKnobs{ SQLLeaseManager: &lease.ManagerTestingKnobs{}, } @@ -1976,7 +1969,7 @@ CREATE TABLE t.after (k CHAR PRIMARY KEY, v CHAR); dbID := beforeDesc.GetParentID() // Acquire a lease on "before" by name. - beforeTable, _, err := t.node(1).AcquireByName( + beforeTable, err := t.node(1).AcquireByName( ctx, t.server.Clock().Now(), dbID, @@ -1994,7 +1987,7 @@ CREATE TABLE t.after (k CHAR PRIMARY KEY, v CHAR); now := timeutil.Now().UnixNano() // Acquire a lease on "after" by name after server startup. - afterTable, _, err := t.node(1).AcquireByName( + afterTable, err := t.node(1).AcquireByName( ctx, t.server.Clock().Now(), dbID, @@ -2022,7 +2015,7 @@ CREATE TABLE t.after (k CHAR PRIMARY KEY, v CHAR); func TestLeaseAcquisitionDoesntBlock(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() s, db, _ := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) @@ -2056,11 +2049,11 @@ func TestLeaseAcquisitionDoesntBlock(t *testing.T) { require.NoError(t, <-schemaCh) - l, _, err := s.LeaseManager().(*lease.Manager).Acquire(ctx, s.Clock().Now(), descID) + l, err := s.LeaseManager().(*lease.Manager).Acquire(ctx, s.Clock().Now(), descID) require.NoError(t, err) // Release the lease so that the schema change can proceed. - require.NoError(t, s.LeaseManager().(*lease.Manager).Release(l)) + l.Release(ctx) // Unblock the schema change. close(schemaUnblock) @@ -2075,7 +2068,7 @@ func TestLeaseAcquisitionDoesntBlock(t *testing.T) { func TestLeaseAcquisitionByNameDoesntBlock(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - params, _ := tests.CreateTestServerParams() + params := createTestServerParams() s, db, _ := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) @@ -2855,3 +2848,187 @@ CREATE TABLE d1.t2 (name int); require.NoError(t, <-errorChan) close(errorChan) } + +// Validates that the transaction deadline can be extended +// past the original lease duration. Previously, we had a +// a limitation if the transaction took longer then the +// the lease, the transaction would fail because of the +// deadline. +func TestLeaseTxnDeadlineExtension(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + filterMu := syncutil.Mutex{} + blockTxn := make(chan struct{}) + blockedOnce := false + var txnID string + + params := createTestServerParams() + // Set the lease duration such that the next lease acquisition will + // require the lease to be reacquired. + lease.LeaseDuration.Override(¶ms.SV, 0) + params.Knobs.Store = &kvserver.StoreTestingKnobs{ + TestingRequestFilter: func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error { + filterMu.Lock() + // Wait for a commit with the txnID, and only allows + // it to resume when the channel gets unblocked. + if req.Txn != nil && req.Txn.ID.String() == txnID { + filterMu.Unlock() + // There will only be a single EndTxn request in + // flight due to the transaction ID filter and + // blocked once flag, so no mutex is needed here. + if req.IsSingleEndTxnRequest() && !blockedOnce { + <-blockTxn + blockedOnce = true + } + } else { + filterMu.Unlock() + } + return nil + }, + } + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) + defer tc.Stopper().Stop(ctx) + conn := tc.ServerConn(0) + // Setup tables for the test. + _, err := conn.Exec(` +CREATE TABLE t1(val int); + `) + require.NoError(t, err) + // Validates that transaction deadlines can move forward into + // the future after lease expiry. + t.Run("validate-lease-txn-deadline-ext", func(t *testing.T) { + conn, err := tc.ServerConn(0).Conn(ctx) + require.NoError(t, err) + descModConn := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + waitChan := make(chan error) + resumeChan := make(chan struct{}) + go func() { + ctx = context.Background() + // Start a transaction that will lease out a table, + // and let the lease duration expire. + _, err := conn.ExecContext(ctx, ` +BEGIN; +SELECT * FROM t1; + `) + if err != nil { + waitChan <- err + return + } + // Fetch the transaction ID, so that we can delay the commit + txnIDResult := conn.QueryRowContext(ctx, `SELECT id FROM crdb_internal.node_transactions WHERE session_id IN (SELECT * FROM [SHOW session_id]);`) + if txnIDResult.Err() != nil { + waitChan <- txnIDResult.Err() + return + } + filterMu.Lock() + err = txnIDResult.Scan(&txnID) + blockedOnce = false + filterMu.Unlock() + if err != nil { + waitChan <- err + return + } + // Inform the main routine that it can cause an operation + // to block us. + waitChan <- nil + <-resumeChan + // Execute an insert once the other transaction + // gets a lease. The lease renewal should adjust + // our deadline. + _, err = conn.ExecContext(ctx, ` +INSERT INTO t1 VALUES (1); +COMMIT;`, + ) + waitChan <- err + }() + + // Wait for the TXN ID and hook to be setup. + err = <-waitChan + require.NoError(t, err) + // Issue a select from a different connection that will + // need a lease. + descModConn.Exec(t, ` +SELECT * FROM T1;`) + resumeChan <- struct{}{} + blockTxn <- struct{}{} + err = <-waitChan + require.NoError(t, err) + }) + + // Validates that the transaction deadline extension can be blocked, + // if the lease can't be renewed, for example if the descriptor gets + // modified. + t.Run("validate-lease-txn-deadline-ext-blocked", func(t *testing.T) { + conn, err := tc.ServerConn(0).Conn(ctx) + require.NoError(t, err) + descModConn := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + waitChan := make(chan error) + resumeChan := make(chan struct{}) + go func() { + ctx = context.Background() + // Start a transaction that will lease out a table, + // and let the lease duration expire. + _, err := conn.ExecContext(ctx, ` +BEGIN; +SELECT * FROM t1; + `) + if err != nil { + waitChan <- err + return + } + // Fetch the transaction ID, so that we can delay the commit + txnIDResult := conn.QueryRowContext(ctx, `SELECT id FROM crdb_internal.node_transactions WHERE session_id IN (SELECT * FROM [SHOW session_id]);`) + if txnIDResult.Err() != nil { + waitChan <- txnIDResult.Err() + return + } + filterMu.Lock() + err = txnIDResult.Scan(&txnID) + blockedOnce = false + filterMu.Unlock() + if err != nil { + waitChan <- err + return + } + // Inform the main routine that it can cause an operation + // to block us. + waitChan <- nil + <-resumeChan + // Execute an insert on the same connection and attempt + // to commit, this operation will fail. + _, err = conn.ExecContext(ctx, ` +INSERT INTO t1 VALUES (1);`, + ) + if err != nil { + waitChan <- err + return + } + _, err = conn.ExecContext(ctx, ` +COMMIT;`, + ) + if err == nil { + err = errors.New("Failing did not get expected error") + } else if !testutils.IsError(err, "pq: restart transaction: TransactionRetryWithProtoRefreshError: TransactionRetryError: retry txn \\(RETRY_COMMIT_DEADLINE_EXCEEDED -.*") { + err = errors.Wrap(err, "Failed unexpected error") + } else { + err = nil + } + waitChan <- err + }() + + // Wait for the TXN ID and hook to be setup. + err = <-waitChan + require.NoError(t, err) + // Issue an alter column on a different connection, which + // will require a lease. + descModConn.Exec(t, ` +ALTER TABLE T1 ALTER COLUMN VAL SET DEFAULT 5; +SELECT * FROM T1`) + resumeChan <- struct{}{} + blockTxn <- struct{}{} + err = <-waitChan + require.NoError(t, err) + }) +} diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 05c55efb86a4..c37393eff165 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1439,7 +1439,6 @@ func (ex *connExecutor) run( ex.server.cfg.SessionRegistry.register(ex.sessionID, ex) ex.planner.extendedEvalCtx.setSessionID(ex.sessionID) defer ex.server.cfg.SessionRegistry.deregister(ex.sessionID) - for { ex.curStmtAST = nil if err := ctx.Err(); err != nil { @@ -1495,7 +1494,6 @@ func (ex *connExecutor) execCmd(ctx context.Context) error { var ev fsm.Event var payload fsm.EventPayload var res ResultBase - switch tcmd := cmd.(type) { case ExecStmt: ex.phaseTimes[sessionQueryReceived] = tcmd.TimeReceived @@ -1541,7 +1539,6 @@ func (ex *connExecutor) execCmd(ctx context.Context) error { case ExecPortal: // ExecPortal is handled like ExecStmt, except that the placeholder info // is taken from the portal. - ex.phaseTimes[sessionQueryReceived] = tcmd.TimeReceived // When parsing has been done earlier, via a separate parse // message, it is not any more part of the statistics collected @@ -2312,12 +2309,17 @@ func (ex *connExecutor) resetPlanner( func (ex *connExecutor) txnStateTransitionsApplyWrapper( ev fsm.Event, payload fsm.EventPayload, res ResultBase, pos CmdPos, ) (advanceInfo, error) { + var implicitTxn bool + txnIsOpen := false if os, ok := ex.machine.CurState().(stateOpen); ok { implicitTxn = os.ImplicitTxn.Get() + txnIsOpen = true } + ex.mu.Lock() err := ex.machine.ApplyWithPayload(withStatement(ex.Ctx(), ex.curStmtAST), ev, payload) + ex.mu.Unlock() if err != nil { if errors.HasType(err, (*fsm.TransitionNotFoundError)(nil)) { panic(err) @@ -2330,16 +2332,22 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( if advInfo.code == rewind { ex.extraTxnState.autoRetryCounter++ } - // Handle transaction events which cause updates to txnState. switch advInfo.txnEvent { case noEvent: + // Update the deadline on the transaction based on the collections, + // if the transaction is currently open. + if txnIsOpen { + err := ex.extraTxnState.descCollection.MaybeUpdateDeadline(ex.Ctx(), ex.state.mu.txn) + if err != nil { + return advanceInfo{}, err + } + } case txnStart: ex.extraTxnState.autoRetryCounter = 0 ex.extraTxnState.onTxnFinish, ex.extraTxnState.onTxnRestart = ex.recordTransactionStart() // Bump the txn counter for logging. ex.extraTxnState.txnCounter++ - case txnCommit: if res.Err() != nil { err := errorutil.UnexpectedWithIssueErrorf( @@ -2406,7 +2414,6 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( return advanceInfo{}, errors.AssertionFailedf( "unexpected event: %v", errors.Safe(advInfo.txnEvent)) } - return advInfo, nil } diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index f34f41434a38..cfed8b9c728c 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -251,6 +251,12 @@ func (ex *connExecutor) execStmtInOpenState( var stmt Statement queryID := ex.generateID() + // Update the deadline on the transaction based on the collections. + err := ex.extraTxnState.descCollection.MaybeUpdateDeadline(ctx, ex.state.mu.txn) + if err != nil { + return nil, nil, err + } + if prepared != nil { stmt = makeStatementFromPrepared(prepared, queryID) } else { diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go index 0e9fa7d2a5a6..c4f4c74094c6 100644 --- a/pkg/sql/conn_executor_prepare.go +++ b/pkg/sql/conn_executor_prepare.go @@ -73,7 +73,6 @@ func (ex *connExecutor) execPrepare( } } } - stmt := makeStatement(parseCmd.Statement, ex.generateID()) ps, err := ex.addPreparedStmt( ctx, @@ -227,10 +226,10 @@ func (ex *connExecutor) prepare( if err := ex.server.cfg.DB.Txn(ctx, prepare); err != nil { return nil, err } - // Prepare is an implicit transaction and we know it with any - // other descriptors. Once the implicit transaction is done - // we can safely drop the leases, since the collections object - // will be shared. + // Prepare with an implicit transaction will end up creating + // a new transaction. Once this transaction is complete, + // we can safely release the leases, otherwise we will + // incorrectly hold leases for later operations. ex.extraTxnState.descCollection.ReleaseAll(ctx) } diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 901948c1baf3..5209531987c0 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -92,7 +92,6 @@ func TestSchemaChangeProcess(t *testing.T) { var id = descpb.ID(keys.MinNonPredefinedUserDescID + 1 /* skip over DB ID */) var instance = base.SQLInstanceID(2) stopper := stop.NewStopper() - cfg := base.NewLeaseManagerConfig() execCfg := s.ExecutorConfig().(sql.ExecutorConfig) rf, err := rangefeed.NewFactory(stopper, kvDB, nil /* knobs */) require.NoError(t, err) @@ -107,7 +106,6 @@ func TestSchemaChangeProcess(t *testing.T) { lease.ManagerTestingKnobs{}, stopper, rf, - cfg, ) jobRegistry := s.JobRegistry().(*jobs.Registry) defer stopper.Stop(context.Background()) @@ -1403,13 +1401,11 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT); // Grab a lease at the latest version so that we are confident // that all future leases will be taken at the latest version. - table, _, err := leaseMgr.TestingAcquireAndAssertMinVersion(ctx, s.Clock().Now(), id, version+1) + table, err := leaseMgr.TestingAcquireAndAssertMinVersion(ctx, s.Clock().Now(), id, version+1) if err != nil { t.Error(err) } - if err := leaseMgr.Release(table); err != nil { - t.Error(err) - } + table.Release(ctx) } // Bulk insert.