Skip to content

Commit

Permalink
kv,sql: transaction commit deadlines due to leases should be late-bound
Browse files Browse the repository at this point in the history
Previously, SQL statements, during their execution, lease
tables from the lease.Manager indirectly through the
connExecutor's descs.Collection. Whenever a lease is used,
the deadline of that version of that lease is applied to
the transaction as a commit deadline. This had the unfortunate
side effect that renewals would not be properly picked up, so
if a transaction took longer then the 4 to 5 minute validity
period, then we can get into problematic scenarios.

To address, this is patch, adds lease tracking through the
the table collections, so that we are aware when extension
occurs, and uses that to figure out the latest time a given
transaction can commit.

Fixes #51042

Release note (bug fix): Fixed a bug that prevent transactions which lasted
longer than 5 minutes and then performed writes from committing.
  • Loading branch information
fqazi committed Apr 28, 2021
1 parent ea92406 commit dbcda1f
Show file tree
Hide file tree
Showing 21 changed files with 779 additions and 592 deletions.
25 changes: 0 additions & 25 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,28 +596,3 @@ func TempStorageConfigFromEnv(
Settings: st,
}
}

// LeaseManagerConfig holds lease manager parameters.
type LeaseManagerConfig struct {
// DescriptorLeaseDuration is the mean duration a lease will be
// acquired for.
DescriptorLeaseDuration time.Duration

// DescriptorLeaseJitterFraction is the factor that we use to
// randomly jitter the lease duration when acquiring a new lease and
// the lease renewal timeout.
DescriptorLeaseJitterFraction float64

// DefaultDescriptorLeaseRenewalTimeout is the default time
// before a lease expires when acquisition to renew the lease begins.
DescriptorLeaseRenewalTimeout time.Duration
}

// NewLeaseManagerConfig initializes a LeaseManagerConfig with default values.
func NewLeaseManagerConfig() *LeaseManagerConfig {
return &LeaseManagerConfig{
DescriptorLeaseDuration: DefaultDescriptorLeaseDuration,
DescriptorLeaseJitterFraction: DefaultDescriptorLeaseJitterFraction,
DescriptorLeaseRenewalTimeout: DefaultDescriptorLeaseRenewalTimeout,
}
}
3 changes: 0 additions & 3 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ type TestServerArgs struct {
*cluster.Settings
RaftConfig

// LeaseManagerConfig holds configuration values specific to the LeaseManager.
LeaseManagerConfig *LeaseManagerConfig

// PartOfCluster must be set if the TestServer is joining others in a cluster.
// If not set (and hence the server is the only one in the cluster), the
// default zone config will be overridden to disable all replication - so that
Expand Down
90 changes: 45 additions & 45 deletions pkg/bench/ddl_analysis/testdata/benchmark_expectations
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,25 @@ exp,benchmark
5,AlterRole/alter_role_with_1_option
6,AlterRole/alter_role_with_2_options
8,AlterRole/alter_role_with_3_options
17,AlterTableAddCheckConstraint/alter_table_add_1_check_constraint
17,AlterTableAddCheckConstraint/alter_table_add_2_check_constraints
17,AlterTableAddCheckConstraint/alter_table_add_3_check_constraints
17,AlterTableAddColumn/alter_table_add_1_column
17,AlterTableAddColumn/alter_table_add_2_columns
17,AlterTableAddColumn/alter_table_add_3_columns
25,AlterTableAddForeignKey/alter_table_add_1_foreign_key
33,AlterTableAddForeignKey/alter_table_add_2_foreign_keys
41,AlterTableAddForeignKey/alter_table_add_3_foreign_keys
25,AlterTableAddForeignKey/alter_table_add_foreign_key_with_3_columns
19,AlterTableAddCheckConstraint/alter_table_add_1_check_constraint
19,AlterTableAddCheckConstraint/alter_table_add_2_check_constraints
19,AlterTableAddCheckConstraint/alter_table_add_3_check_constraints
19,AlterTableAddColumn/alter_table_add_1_column
19,AlterTableAddColumn/alter_table_add_2_columns
19,AlterTableAddColumn/alter_table_add_3_columns
27,AlterTableAddForeignKey/alter_table_add_1_foreign_key
35,AlterTableAddForeignKey/alter_table_add_2_foreign_keys
43,AlterTableAddForeignKey/alter_table_add_3_foreign_keys
27,AlterTableAddForeignKey/alter_table_add_foreign_key_with_3_columns
27,AlterTableConfigureZone/alter_table_configure_zone_5_replicas
27,AlterTableConfigureZone/alter_table_configure_zone_7_replicas_
27,AlterTableConfigureZone/alter_table_configure_zone_ranges
21,AlterTableDropColumn/alter_table_drop_1_column
25,AlterTableDropColumn/alter_table_drop_2_columns
29,AlterTableDropColumn/alter_table_drop_3_columns
18,AlterTableDropConstraint/alter_table_drop_1_check_constraint
19,AlterTableDropConstraint/alter_table_drop_2_check_constraints
20,AlterTableDropConstraint/alter_table_drop_3_check_constraints
23,AlterTableDropColumn/alter_table_drop_1_column
27,AlterTableDropColumn/alter_table_drop_2_columns
31,AlterTableDropColumn/alter_table_drop_3_columns
20,AlterTableDropConstraint/alter_table_drop_1_check_constraint
21,AlterTableDropConstraint/alter_table_drop_2_check_constraints
22,AlterTableDropConstraint/alter_table_drop_3_check_constraints
14-15,AlterTableSplit/alter_table_split_at_1_value
20-21,AlterTableSplit/alter_table_split_at_2_values
26-27,AlterTableSplit/alter_table_split_at_3_values
Expand All @@ -31,27 +31,27 @@ exp,benchmark
7,CreateRole/create_role_with_2_options
8,CreateRole/create_role_with_3_options
6,CreateRole/create_role_with_no_options
19,DropDatabase/drop_database_0_tables
28,DropDatabase/drop_database_1_table
37,DropDatabase/drop_database_2_tables
46,DropDatabase/drop_database_3_tables
21,DropDatabase/drop_database_0_tables
30,DropDatabase/drop_database_1_table
39,DropDatabase/drop_database_2_tables
48,DropDatabase/drop_database_3_tables
13,DropRole/drop_1_role
21,DropRole/drop_2_roles
29,DropRole/drop_3_roles
19,DropSequence/drop_1_sequence
29,DropSequence/drop_2_sequences
39,DropSequence/drop_3_sequences
22,DropTable/drop_1_table
35,DropTable/drop_2_tables
48,DropTable/drop_3_tables
23,DropView/drop_1_view
40,DropView/drop_2_views
57,DropView/drop_3_views
19,Grant/grant_all_on_1_table
22,Grant/grant_all_on_2_tables
25,Grant/grant_all_on_3_tables
19,GrantRole/grant_1_role
22,GrantRole/grant_2_roles
21,DropSequence/drop_1_sequence
31,DropSequence/drop_2_sequences
41,DropSequence/drop_3_sequences
24,DropTable/drop_1_table
37,DropTable/drop_2_tables
50,DropTable/drop_3_tables
25,DropView/drop_1_view
42,DropView/drop_2_views
59,DropView/drop_3_views
21,Grant/grant_all_on_1_table
24,Grant/grant_all_on_2_tables
27,Grant/grant_all_on_3_tables
21,GrantRole/grant_1_role
24,GrantRole/grant_2_roles
2,ORMQueries/activerecord_type_introspection_query
2,ORMQueries/django_table_introspection_1_table
2,ORMQueries/django_table_introspection_4_tables
Expand All @@ -60,19 +60,19 @@ exp,benchmark
2,ORMQueries/pg_class
2,ORMQueries/pg_namespace
2,ORMQueries/pg_type
19,Revoke/revoke_all_on_1_table
22,Revoke/revoke_all_on_2_tables
25,Revoke/revoke_all_on_3_tables
18,RevokeRole/revoke_1_role
20,RevokeRole/revoke_2_roles
21,Revoke/revoke_all_on_1_table
24,Revoke/revoke_all_on_2_tables
27,Revoke/revoke_all_on_3_tables
20,RevokeRole/revoke_1_role
22,RevokeRole/revoke_2_roles
1,SystemDatabaseQueries/select_system.users_with_empty_database_name
1,SystemDatabaseQueries/select_system.users_with_schema_name
2,SystemDatabaseQueries/select_system.users_without_schema_name
24,Truncate/truncate_1_column_0_rows
24,Truncate/truncate_1_column_1_row
24,Truncate/truncate_1_column_2_rows
24,Truncate/truncate_2_column_0_rows
24,Truncate/truncate_2_column_1_rows
24,Truncate/truncate_2_column_2_rows
26,Truncate/truncate_1_column_0_rows
26,Truncate/truncate_1_column_1_row
26,Truncate/truncate_1_column_2_rows
26,Truncate/truncate_2_column_0_rows
26,Truncate/truncate_2_column_1_rows
26,Truncate/truncate_2_column_2_rows
1,VirtualTableQueries/select_crdb_internal.invalid_objects_with_1_fk
1,VirtualTableQueries/select_crdb_internal.tables_with_1_fk
8 changes: 3 additions & 5 deletions pkg/ccl/changefeedccl/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,16 @@ func (c *rowFetcherCache) TableDescForKey(

// Retrieve the target TableDescriptor from the lease manager. No caching
// is attempted because the lease manager does its own caching.
desc, _, err := c.leaseMgr.Acquire(ctx, ts, tableID)
desc, err := c.leaseMgr.Acquire(ctx, ts, tableID)
if err != nil {
// Manager can return all kinds of errors during chaos, but based on
// its usage, none of them should ever be terminal.
return nil, MarkRetryableError(err)
}
tableDesc = desc.Desc().(catalog.TableDescriptor)
// Immediately release the lease, since we only need it for the exact
// timestamp requested.
if err := c.leaseMgr.Release(desc); err != nil {
return nil, err
}
tableDesc = desc.(catalog.TableDescriptor)
desc.Release(ctx)
if tableDesc.ContainsUserDefinedTypes() {
// If the table contains user defined types, then use the descs.Collection
// to retrieve a TableDescriptor with type metadata hydrated. We open a
Expand Down
27 changes: 12 additions & 15 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,22 +338,17 @@ func TestTxnCoordSenderEndTxn(t *testing.T) {

case 1:
// Past deadline.
if !txn.UpdateDeadlineMaybe(ctx, pushedTimestamp.Prev()) {
t.Fatalf("did not update deadline")
}

err := txn.UpdateDeadline(ctx, pushedTimestamp.Prev())
require.NoError(t, err, "Deadline update to past failed")
case 2:
// Equal deadline.
if !txn.UpdateDeadlineMaybe(ctx, pushedTimestamp) {
t.Fatalf("did not update deadline")
}
err := txn.UpdateDeadline(ctx, pushedTimestamp)
require.NoError(t, err, "Deadline update to equal failed")

case 3:
// Future deadline.

if !txn.UpdateDeadlineMaybe(ctx, pushedTimestamp.Next()) {
t.Fatalf("did not update deadline")
}
err := txn.UpdateDeadline(ctx, pushedTimestamp.Next())
require.NoError(t, err, "Deadline update to future failed")
}
err = txn.CommitOrCleanup(ctx)

Expand Down Expand Up @@ -2163,11 +2158,12 @@ func TestReadOnlyTxnObeysDeadline(t *testing.T) {
t.Run("standalone commit", func(t *testing.T) {
txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */)
// Set a deadline. We'll generate a retriable error with a higher timestamp.
txn.UpdateDeadlineMaybe(ctx, clock.Now())
err := txn.UpdateDeadline(ctx, clock.Now())
require.NoError(t, err, "Deadline update to now failed")
if _, err := txn.Get(ctx, "k"); err != nil {
t.Fatal(err)
}
err := txn.Commit(ctx)
err = txn.Commit(ctx)
assertTransactionRetryError(t, err)
if !testutils.IsError(err, "RETRY_COMMIT_DEADLINE_EXCEEDED") {
t.Fatalf("expected deadline exceeded, got: %s", err)
Expand All @@ -2177,10 +2173,11 @@ func TestReadOnlyTxnObeysDeadline(t *testing.T) {
t.Run("commit in batch", func(t *testing.T) {
txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */)
// Set a deadline. We'll generate a retriable error with a higher timestamp.
txn.UpdateDeadlineMaybe(ctx, clock.Now())
err := txn.UpdateDeadline(ctx, clock.Now())
require.NoError(t, err, "Deadline update to now failed")
b := txn.NewBatch()
b.Get("k")
err := txn.CommitInBatch(ctx, b)
err = txn.CommitInBatch(ctx, b)
assertTransactionRetryError(t, err)
if !testutils.IsError(err, "RETRY_COMMIT_DEADLINE_EXCEEDED") {
t.Fatalf("expected deadline exceeded, got: %s", err)
Expand Down
28 changes: 13 additions & 15 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,29 +684,27 @@ func (txn *Txn) CommitOrCleanup(ctx context.Context) error {
return err
}

// UpdateDeadlineMaybe sets the transactions deadline to the lower of the
// current one (if any) and the passed value.
// UpdateDeadline sets the transactions deadline to the passed deadline.
// It may move the deadline to any timestamp above the current read timestamp.
//
// The deadline cannot be lower than txn.ReadTimestamp.
func (txn *Txn) UpdateDeadlineMaybe(ctx context.Context, deadline hlc.Timestamp) bool {
func (txn *Txn) UpdateDeadline(ctx context.Context, deadline hlc.Timestamp) error {
if txn.typ != RootTxn {
panic(errors.WithContextTags(errors.AssertionFailedf("UpdateDeadlineMaybe() called on leaf txn"), ctx))
panic(errors.WithContextTags(errors.AssertionFailedf("UpdateDeadline() called on leaf txn"), ctx))
}

txn.mu.Lock()
defer txn.mu.Unlock()
if txn.mu.deadline == nil || deadline.Less(*txn.mu.deadline) {
readTimestamp := txn.readTimestampLocked()
if deadline.Less(txn.readTimestampLocked()) {
log.Fatalf(ctx, "deadline below read timestamp is nonsensical; "+
"txn has would have no change to commit. Deadline: %s. Read timestamp: %s.",
deadline, readTimestamp)
}
txn.mu.deadline = new(hlc.Timestamp)
*txn.mu.deadline = deadline
return true

readTimestamp := txn.readTimestampLocked()
if deadline.Less(readTimestamp) {
log.Fatalf(ctx, "deadline below read timestamp is nonsensical; "+
"txn has would have no chance to commit. Deadline: %s. Read timestamp: %s Previous Deadline: %s.",
deadline, readTimestamp, txn.mu.deadline)
}
return false
txn.mu.deadline = new(hlc.Timestamp)
*txn.mu.deadline = deadline
return nil
}

// resetDeadlineLocked resets the deadline.
Expand Down
19 changes: 9 additions & 10 deletions pkg/kv/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,25 +497,24 @@ func TestUpdateDeadlineMaybe(t *testing.T) {
}

deadline := hlc.Timestamp{WallTime: 10, Logical: 1}
if !txn.UpdateDeadlineMaybe(ctx, deadline) {
t.Errorf("expected update, but it didn't happen")
}
err := txn.UpdateDeadline(ctx, deadline)
require.NoError(t, err, "Deadline update failed")
if d := *txn.deadline(); d != deadline {
t.Errorf("unexpected deadline: %s", d)
}

// Deadline is always updated now, there is no
// maybe.
futureDeadline := hlc.Timestamp{WallTime: 11, Logical: 1}
if txn.UpdateDeadlineMaybe(ctx, futureDeadline) {
t.Errorf("expected no update, but update happened")
}
if d := *txn.deadline(); d != deadline {
err = txn.UpdateDeadline(ctx, futureDeadline)
require.NoError(t, err, "Future deadline update failed")
if d := *txn.deadline(); d != futureDeadline {
t.Errorf("unexpected deadline: %s", d)
}

pastDeadline := hlc.Timestamp{WallTime: 9, Logical: 1}
if !txn.UpdateDeadlineMaybe(ctx, pastDeadline) {
t.Errorf("expected update, but it didn't happen")
}
err = txn.UpdateDeadline(ctx, pastDeadline)
require.NoError(t, err, "Past deadline update failed")
if d := *txn.deadline(); d != pastDeadline {
t.Errorf("unexpected deadline: %s", d)
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,6 @@ type SQLConfig struct {
// The tenant that the SQL server runs on the behalf of.
TenantID roachpb.TenantID

// LeaseManagerConfig holds configuration values specific to the LeaseManager.
LeaseManagerConfig *base.LeaseManagerConfig

// SocketFile, if non-empty, sets up a TLS-free local listener using
// a unix datagram socket at the specified path.
SocketFile string
Expand Down Expand Up @@ -347,7 +344,6 @@ func MakeSQLConfig(tenID roachpb.TenantID, tempStorageCfg base.TempStorageConfig
TableStatCacheSize: defaultSQLTableStatCacheSize,
QueryCacheSize: defaultSQLQueryCacheSize,
TempStorageConfig: tempStorageCfg,
LeaseManagerConfig: base.NewLeaseManagerConfig(),
}
return sqlCfg
}
Expand Down
1 change: 0 additions & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
lmKnobs,
cfg.stopper,
cfg.rangeFeedFactory,
cfg.LeaseManagerConfig,
)
cfg.registry.AddMetricStruct(leaseMgr.MetricsStruct())

Expand Down
5 changes: 0 additions & 5 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,6 @@ func makeTestConfigFromParams(params base.TestServerArgs) Config {
cfg.TestingKnobs = params.Knobs
cfg.RaftConfig = params.RaftConfig
cfg.RaftConfig.SetDefaults()
if params.LeaseManagerConfig != nil {
cfg.LeaseManagerConfig = params.LeaseManagerConfig
} else {
cfg.LeaseManagerConfig = base.NewLeaseManagerConfig()
}
if params.JoinAddr != "" {
cfg.JoinList = []string{params.JoinAddr}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/settings/setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

// MaxSettings is the maximum number of settings that the system supports.
// Exported for tests.
const MaxSettings = 288
const MaxSettings = 512

// Values is a container that stores values for all registered settings.
// Each setting is assigned a unique slot (up to MaxSettings).
Expand Down
Loading

0 comments on commit dbcda1f

Please sign in to comment.