Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
118995: schemachanger: support CREATE SCHEMA ... AUTHORIZATION r=fqazi a=annrpom

This patch enables support for `CREATE SCHEMA ... AUTHORIZATION` in the declarative schema changer

Fixes: #115369
Epic: [CRDB-31331](https://cockroachlabs.atlassian.net/browse/CRDB-31331)

Release note: None

119752: sqlstats: dont block flush due to sql activity job r=xinhaoz a=xinhaoz

The sql stats flush worker signals the sql activity update job on flush completion via an unbuffered channel. It currently blocks on sending that signal so if that job is stuck updating stats for some reason this also makes the flush stuck. This commit ensures that the flush worker can continue normally even if the sql activity update job is not ready to receive its next signal. This ensures that the coordinator node for the sql stats activity job can continue to collect sql stats normally.

Epic: none
Fixes: #119751

119971: sqlstats: skip TestTransactionServiceLatencyOnExtendedProtocol r=xinhaoz a=xinhaoz

Skip this test until we fix the data race caused by the testing knob functions.

Epic: none

Release note: None

119977: sqlstats: add retries to stats test on locked table r=xinhaoz a=dhartunian

Previously, the `TestSQLStatsReadLimitSizeOnLockedTable` test would fail very occasionally due to a rare scenario. When stats are written, they contain a column that's a hashed shard index. It's expected that statements are uniformly distributed across this shard, but that's not guaranteed. Later in the test we check a random shard to make sure its stats count exceeds a minimum of 1 (because we place a total lower bound of 8 in the cluster setting, which is then divided by 8 to derive the per-shard limit). This case will occasionally fail if the random shard that's picked happens to only contain a single statement within.

This change modifies the loop at the end of the test to expect a `false` value and make sure to get at least a single `true` result after 3 iterations, instead of requiring 3 `true` results every single time. The requirement that the queries run despite contention will still stand since we'll return an error in that case.

Resolves: #119067
Epic: None

Release note: None

119999: sql: fix mixed version test for node-level sequence caching r=jasminejsun a=jasminejsun

Fix mixed version test for node-level sequence caching by making it wait for upgrade to complete.

Epic: none
Fixes: #119978

Release note: None

120005: logictest: update cache size for on-disk logictest configs r=rafiss a=rafiss

The default cache size is 128MB. This can result in disk-related slowdowns for normal operations. Now we configure a larger cache so that we avoid disk access.

fixes #119897
Release note: None

Co-authored-by: Annie Pompa <[email protected]>
Co-authored-by: Xin Hao Zhang <[email protected]>
Co-authored-by: David Hartunian <[email protected]>
Co-authored-by: Jasmine Sun <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
6 people committed Mar 6, 2024
7 parents 549a5c2 + 9be6368 + 29df8ab + 7a9011a + fa75860 + d6981c4 + efc2cd5 commit 82eb30e
Show file tree
Hide file tree
Showing 19 changed files with 117 additions and 48 deletions.
6 changes: 3 additions & 3 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1600,10 +1600,10 @@ def go_deps():
name = "com_github_cockroachdb_cockroach_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/cockroachdb/cockroach-go/v2",
sha256 = "70860a2615f3df73f7d00b4801b1fffe30a3306ae1cda1e9bf5245bb74e86d9a",
strip_prefix = "github.com/cockroachdb/cockroach-go/[email protected].5",
sha256 = "028c29c79c2d373bca3ce9a475291285fdcb68a2f908190f738d5ce605edbd07",
strip_prefix = "github.com/cockroachdb/cockroach-go/[email protected].7",
urls = [
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/cockroach-go/v2/com_github_cockroachdb_cockroach_go_v2-v2.3.5.zip",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/cockroach-go/v2/com_github_cockroachdb_cockroach_go_v2-v2.3.7.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion build/bazelutil/distdir_files.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ DISTDIR_FILES = {
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/apd/v3/com_github_cockroachdb_apd_v3-v3.2.1.zip": "6ad54bb71a36fba8ca6725a00d916e51815a4c68de54096313ca6fffda6c87c2",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/bubbletea/com_github_cockroachdb_bubbletea-v0.23.1-bracketed-paste2.zip": "d7916a0e7d8d814566e8f8d162c3764aea947296396a0a669564ff3ee53414bc",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/cmux/com_github_cockroachdb_cmux-v0.0.0-20170110192607-30d10be49292.zip": "88f6f9cf33eb535658540b46f6222f029398e590a3ff9cc873d7d561ac6debf0",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/cockroach-go/v2/com_github_cockroachdb_cockroach_go_v2-v2.3.5.zip": "70860a2615f3df73f7d00b4801b1fffe30a3306ae1cda1e9bf5245bb74e86d9a",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/cockroach-go/v2/com_github_cockroachdb_cockroach_go_v2-v2.3.7.zip": "028c29c79c2d373bca3ce9a475291285fdcb68a2f908190f738d5ce605edbd07",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/crlfmt/com_github_cockroachdb_crlfmt-v0.0.0-20221214225007-b2fc5c302548.zip": "fedc01bdd6d964da0425d5eaac8efadc951e78e13f102292cc0774197f09ab63",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/datadriven/com_github_cockroachdb_datadriven-v1.0.3-0.20230801171734-e384cf455877.zip": "3f1ac51e496ac26ae297c9846bb78eb416fd50b7566b892fb7087af9273766fe",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/errors/com_github_cockroachdb_errors-v1.11.1.zip": "6c8101703839a0d69685485d3d0d6040f51b83382bbc53c362c47732246249bf",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ require (
github.com/client9/misspell v0.3.4
github.com/cockroachdb/apd/v3 v3.2.1
github.com/cockroachdb/cmux v0.0.0-20170110192607-30d10be49292
github.com/cockroachdb/cockroach-go/v2 v2.3.5
github.com/cockroachdb/cockroach-go/v2 v2.3.7
github.com/cockroachdb/crlfmt v0.0.0-20221214225007-b2fc5c302548
github.com/cockroachdb/datadriven v1.0.3-0.20230801171734-e384cf455877
github.com/cockroachdb/errors v1.11.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,8 @@ github.com/cockroachdb/bubbletea v0.23.1-bracketed-paste2 h1:OmQDBlTg1DU5OaKjIuE
github.com/cockroachdb/bubbletea v0.23.1-bracketed-paste2/go.mod h1:JAfGK/3/pPKHTnAS8JIE2u9f61BjWTQY57RbT25aMXU=
github.com/cockroachdb/cmux v0.0.0-20170110192607-30d10be49292 h1:dzj1/xcivGjNPwwifh/dWTczkwcuqsXXFHY1X/TZMtw=
github.com/cockroachdb/cmux v0.0.0-20170110192607-30d10be49292/go.mod h1:qRiX68mZX1lGBkTWyp3CLcenw9I94W2dLeRvMzcn9N4=
github.com/cockroachdb/cockroach-go/v2 v2.3.5 h1:Khtm8K6fTTz/ZCWPzU9Ne3aOW9VyAnj4qIPCJgKtwK0=
github.com/cockroachdb/cockroach-go/v2 v2.3.5/go.mod h1:1wNJ45eSXW9AnOc3skntW9ZUZz6gxrQK3cOj3rK+BC8=
github.com/cockroachdb/cockroach-go/v2 v2.3.7 h1:nq5GYDuA2zIR/kdLkVLTg7oHTw0UbGU9RWpC+OZVYYU=
github.com/cockroachdb/cockroach-go/v2 v2.3.7/go.mod h1:1wNJ45eSXW9AnOc3skntW9ZUZz6gxrQK3cOj3rK+BC8=
github.com/cockroachdb/crlfmt v0.0.0-20221214225007-b2fc5c302548 h1:i0bnjanlWAvM50wHMT7EFyxlt5HQusznWrkwl+HBIsU=
github.com/cockroachdb/crlfmt v0.0.0-20221214225007-b2fc5c302548/go.mod h1:qtkxNlt5i3rrdirfJE/bQeW/IeLajKexErv7jEIV+Uc=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -1321,6 +1321,7 @@ func (t *logicTest) newTestServerCluster(bootstrapBinaryPath, upgradeBinaryPath
opts := []testserver.TestServerOpt{
testserver.ThreeNodeOpt(),
testserver.StoreOnDiskOpt(),
testserver.CacheSizeOpt(0.1),
testserver.CockroachBinaryPathOpt(bootstrapBinaryPath),
testserver.UpgradeCockroachBinaryPathOpt(upgradeBinaryPath),
testserver.PollListenURLTimeoutOpt(120),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ upgrade 1

upgrade 2

statement ok
SET CLUSTER SETTING version = crdb_internal.node_executable_version();

query T nodeidx=1
SELECT crdb_internal.release_series(crdb_internal.node_executable_version())
----
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/schema
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,9 @@ CREATE USER user1;
statement ok
CREATE SCHEMA AUTHORIZATION user1

statement error pq: role/user "typo" does not exist
CREATE SCHEMA AUTHORIZATION typo

statement error pq: schema "user1" already exists
CREATE SCHEMA AUTHORIZATION user1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -72,9 +71,17 @@ func CreateSchema(b BuildCtx, n *tree.CreateSchema) {
// via AUTHORIZATION clause.
owner := b.CurrentUser()
if !n.AuthRole.Undefined() {
// TODO (xiang): Support "CREATE SCHEMA AUTHORIZATION <owner>".
panic(scerrors.NotImplementedErrorf(n, "create schema specifying owner with "+
"AUTHORIZATION is not implemented yet"))
authRole, err := decodeusername.FromRoleSpec(
b.SessionData(), username.PurposeValidation, n.AuthRole,
)
if err != nil {
panic(err)
}
// Block CREATE SCHEMA AUTHORIZATION "foo" when "foo" isn't an existing user.
if err = b.CheckRoleExists(b, authRole); err != nil {
panic(sqlerrors.NewUndefinedUserError(authRole))
}
owner = authRole
}

// 6. Finally, create and add constituent elements to builder state.
Expand Down
4 changes: 0 additions & 4 deletions pkg/sql/schemachanger/scbuild/testdata/unimplemented_create
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ unimplemented
CREATE DATABASE db PRIMARY REGION "us-east1" REGIONS "us-east1", "us-central1", "us-west1" SURVIVE REGION FAILURE;
----

unimplemented
CREATE SCHEMA sc AUTHORIZATION roacher;
----

unimplemented
CREATE TYPE typ AS ENUM('a','b');
----
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
setup
CREATE USER foo WITH LOGIN PASSWORD 'bar';
----

test
CREATE SCHEMA sc;
CREATE SCHEMA sc AUTHORIZATION foo;
----
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
/* setup */
CREATE USER foo WITH LOGIN PASSWORD 'bar';

/* test */
EXPLAIN (DDL) CREATE SCHEMA sc;
EXPLAIN (DDL) CREATE SCHEMA sc AUTHORIZATION foo;
----
Schema change plan for CREATE SCHEMA ‹defaultdb›.‹sc›;
Schema change plan for CREATE SCHEMA ‹defaultdb›.‹sc› AUTHORIZATION foo;
├── StatementPhase
│ └── Stage 1 of 1 in StatementPhase
│ ├── 6 elements transitioning toward PUBLIC
Expand All @@ -18,7 +19,7 @@ Schema change plan for CREATE SCHEMA ‹defaultdb›.‹sc›;
│ ├── SetNameInDescriptor {"DescriptorID":104,"Name":"sc"}
│ ├── AddDescriptorName {"Namespace":{"DatabaseID":100,"DescriptorID":104,"Name":"sc"}}
│ ├── AddSchemaParent {"Parent":{"ParentDatabaseID":100,"SchemaID":104}}
│ ├── UpdateOwner {"Owner":{"DescriptorID":104,"Owner":"root"}}
│ ├── UpdateOwner {"Owner":{"DescriptorID":104,"Owner":"foo"}}
│ ├── UpdateUserPrivileges {"Privileges":{"DescriptorID":104,"Privileges":2,"UserName":"admin","WithGrantOption":2}}
│ ├── UpdateUserPrivileges {"Privileges":{"DescriptorID":104,"Privileges":2,"UserName":"root","WithGrantOption":2}}
│ └── MarkDescriptorAsPublic {"DescriptorID":104}
Expand Down Expand Up @@ -46,7 +47,7 @@ Schema change plan for CREATE SCHEMA ‹defaultdb›.‹sc›;
├── SetNameInDescriptor {"DescriptorID":104,"Name":"sc"}
├── AddDescriptorName {"Namespace":{"DatabaseID":100,"DescriptorID":104,"Name":"sc"}}
├── AddSchemaParent {"Parent":{"ParentDatabaseID":100,"SchemaID":104}}
├── UpdateOwner {"Owner":{"DescriptorID":104,"Owner":"root"}}
├── UpdateOwner {"Owner":{"DescriptorID":104,"Owner":"foo"}}
├── UpdateUserPrivileges {"Privileges":{"DescriptorID":104,"Privileges":2,"UserName":"admin","WithGrantOption":2}}
├── UpdateUserPrivileges {"Privileges":{"DescriptorID":104,"Privileges":2,"UserName":"root","WithGrantOption":2}}
└── MarkDescriptorAsPublic {"DescriptorID":104}
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/* setup */
CREATE USER foo WITH LOGIN PASSWORD 'bar';

/* test */
EXPLAIN (DDL, SHAPE) CREATE SCHEMA sc;
EXPLAIN (DDL, SHAPE) CREATE SCHEMA sc AUTHORIZATION foo;
----
Schema change plan for CREATE SCHEMA ‹defaultdb›.‹sc›;
Schema change plan for CREATE SCHEMA ‹defaultdb›.‹sc› AUTHORIZATION foo;
└── execute 1 system table mutations transaction
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
/* setup */
CREATE USER foo WITH LOGIN PASSWORD 'bar';
----

...

/* test */
CREATE SCHEMA sc;
CREATE SCHEMA sc AUTHORIZATION foo;
----
begin transaction #1
# begin StatementPhase
checking for feature: CREATE SCHEMA
increment telemetry for sql.schema.create_schema
checking role/user "foo" exists
write *eventpb.CreateSchema to event log:
owner: root
owner: foo
schemaName: defaultdb.sc
sql:
descriptorId: 104
statement: CREATE SCHEMA ‹defaultdb›.‹sc›
statement: CREATE SCHEMA ‹defaultdb›.‹sc› AUTHORIZATION foo
tag: CREATE SCHEMA
user: root
## StatementPhase stage 1 of 1 with 8 MutationType ops
Expand All @@ -27,7 +29,7 @@ upsert descriptor #104
+ name: sc
+ parentId: 100
+ privileges:
+ ownerProto: root
+ ownerProto: foo
+ users:
+ - privileges: "2"
+ userProto: admin
Expand Down Expand Up @@ -60,7 +62,7 @@ upsert descriptor #104
+ name: sc
+ parentId: 100
+ privileges:
+ ownerProto: root
+ ownerProto: foo
+ users:
+ - privileges: "2"
+ userProto: admin
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ var sqlStatsLimitTableSizeEnabled = settings.RegisterBoolSetting(
true,
)

// sqlStatsLimitTableCheckInterval is the cluster setting the controls what
// SQLStatsLimitTableCheckInterval is the cluster setting the controls what
// interval the check is done if the statement and transaction statistics
// tables have grown past the sql.stats.persisted_rows.max.
var sqlStatsLimitTableCheckInterval = settings.RegisterDurationSetting(
var SQLStatsLimitTableCheckInterval = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"sql.stats.limit_table_size_check.interval",
"controls what interval the check is done if the statement and "+
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sqlstats/persistedsqlstats/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *PersistedSQLStats) Flush(ctx context.Context, stopper *stop.Stopper) {
func (s *PersistedSQLStats) StmtsLimitSizeReached(ctx context.Context) (bool, error) {
// Doing a count check on every flush for every node adds a lot of overhead.
// To reduce the overhead only do the check once an hour by default.
intervalToCheck := sqlStatsLimitTableCheckInterval.Get(&s.cfg.Settings.SV)
intervalToCheck := SQLStatsLimitTableCheckInterval.Get(&s.cfg.Settings.SV)
if !s.lastSizeCheck.IsZero() && s.lastSizeCheck.Add(intervalToCheck).After(timeutil.Now()) {
log.Infof(ctx, "PersistedSQLStats.StmtsLimitSizeReached skipped with last check at: %s and check interval: %s", s.lastSizeCheck, intervalToCheck)
return false, nil
Expand Down
77 changes: 60 additions & 17 deletions pkg/sql/sqlstats/persistedsqlstats/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,8 +682,7 @@ func TestSQLStatsReadLimitSizeOnLockedTable(t *testing.T) {
// Ensure we have some rows in system.statement_statistics
require.GreaterOrEqual(t, stmtStatsCountFlush, minNumExpectedStmts)

// Set sql.stats.persisted_rows.max
sqlConn.Exec(t, fmt.Sprintf("SET CLUSTER SETTING sql.stats.persisted_rows.max=%d", maxNumPersistedRows))
persistedsqlstats.SQLStatsMaxPersistedRows.Override(ctx, &s.ClusterSettings().SV, maxNumPersistedRows)

// We need SucceedsSoon here for the follower read timestamp to catch up
// enough for this state to be reached.
Expand All @@ -705,7 +704,7 @@ func TestSQLStatsReadLimitSizeOnLockedTable(t *testing.T) {

// Set table size check interval to .0000001 second. So the next check doesn't
// use the cached value.
sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.limit_table_size_check.interval='.0000001s'")
persistedsqlstats.SQLStatsLimitTableCheckInterval.Override(ctx, &s.ClusterSettings().SV, time.Nanosecond)

// Begin a transaction.
sqlConn.Exec(t, "BEGIN")
Expand All @@ -714,28 +713,35 @@ func TestSQLStatsReadLimitSizeOnLockedTable(t *testing.T) {

// Ensure that we can read from the table despite it being locked, due to the follower read (AOST).
// Expect that the number of statements in the table exceeds sql.stats.persisted_rows.max * 1.5
// (meaning that the limit will be reached) and no error. Loop to make sure that
// checking it multiple times still returns the correct value.
// (meaning that the limit will be reached) and no error. Every iteration picks a random shard, and we
// loop 3 times to make sure that we find at least one shard with a count over the limit. In the wild,
// we've observed individual shards only having a single statement recorded which makes this check fail
// otherwise.
foundLimit := false
for i := 0; i < 3; i++ {
limitReached, err = pss.StmtsLimitSizeReached(ctx)
require.NoError(t, err)
if !limitReached {
readStmt := `SELECT crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8, count(*)
if limitReached {
foundLimit = true
}
}

if !foundLimit {
readStmt := `SELECT crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8, count(*)
FROM system.statement_statistics
AS OF SYSTEM TIME follower_read_timestamp()
GROUP BY crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8`

sqlConn2 := sqlutils.MakeSQLRunner(s.SQLConn(t))
rows := sqlConn2.Query(t, readStmt)
shard := make([]int64, 8)
count := make([]int64, 8)
for j := 0; rows.Next(); {
err := rows.Scan(&shard[j], &count[j])
require.NoError(t, err)
j += 1
}
t.Fatalf("limitReached should be true. loop: %d; shards: %d counts: %d", i, shard, count)
sqlConn2 := sqlutils.MakeSQLRunner(s.SQLConn(t))
rows := sqlConn2.Query(t, readStmt)
shard := make([]int64, 8)
count := make([]int64, 8)
for j := 0; rows.Next(); {
err := rows.Scan(&shard[j], &count[j])
require.NoError(t, err)
j += 1
}
t.Fatalf("limitReached should be true. shards: %d counts: %d", shard, count)
}

// Close the transaction.
Expand Down Expand Up @@ -1192,3 +1198,40 @@ func smallestStatsCountAcrossAllShards(

return numStmtStats
}

func TestSQLStatsFlushDoesntWaitForFlushSigReceiver(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

sqlStatsKnobs := sqlstats.CreateTestingKnobs()
var sqlStmtFlushCount, sqlTxnFlushCount atomic.Int32
sqlStatsKnobs.OnStmtStatsFlushFinished = func() {
sqlStmtFlushCount.Add(1)
}
sqlStatsKnobs.OnTxnStatsFlushFinished = func() {
sqlTxnFlushCount.Add(1)
}
tc := serverutils.StartCluster(t, 3 /* numNodes */, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLStatsKnobs: sqlStatsKnobs,
},
},
})

ctx := context.Background()
defer tc.Stopper().Stop(ctx)

ss := tc.ApplicationLayer(0).SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats)
flushDoneCh := make(chan struct{})
ss.SetFlushDoneSignalCh(flushDoneCh)

// It should not block on the flush signal receiver.
persistedsqlstats.SQLStatsFlushInterval.Override(ctx, &tc.Server(0).ClusterSettings().SV, 100*time.Millisecond)
testutils.SucceedsSoon(t, func() error {
if sqlStmtFlushCount.Load() < 5 || sqlTxnFlushCount.Load() < 5 {
return errors.New("flush count hasn't been reached yet")
}
return nil
})
}
5 changes: 5 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ func (s *PersistedSQLStats) startSQLStatsFlushLoop(ctx context.Context, stopper
return
case <-s.drain:
return
default:
// Don't block the flush loop if the sql activity update job is not
// ready to receive. We should at least continue to collect and flush
// stats for this node.
log.Warning(ctx, "sql-stats-worker: unable to signal flush completion")
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sqlstats/sslocal/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ go_test(
"//pkg/sql/sqlstats/ssmemstorage",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sqlstats/sslocal/sql_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/ssmemstorage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
Expand Down Expand Up @@ -712,6 +713,7 @@ func TestUnprivilegedUserReset(t *testing.T) {
func TestTransactionServiceLatencyOnExtendedProtocol(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.WithIssue(t, 119580)
ctx := context.Background()

testData := []*struct {
Expand Down

0 comments on commit 82eb30e

Please sign in to comment.