Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
96257: kvserver: log if lease applies with a delay r=erikgrinaker a=tbg

When we transfer a lease to a lagging follower, there's often a latency
blip that we get asked to investigate. This is time consuming; it's
often very subtle to even figure out that it happened. We try to be
better about not doing it, but at least on 22.1 we know it's possible,
and we can't backport the rather involved fixes.

This warning makes it fairly obvious when it happens.

> W230131 [...] [T1,n2,s2,r23/3:‹/Table/2{1-2}›,raft] 165  lease repl=(n2,s2):3 seq=5 start=1675153630.108829000,0 epo=3 pro=1675153630.108829000,0 active after replication lag of ~0.58s; foreground traffic may have been impacted [prev=repl=(n3,s3):2 seq=4 start=1675153407.528408000,0 epo=2 pro=1675153419.837642000,0]

Addresses #95991.

Epic: none
Release note: None


96529: backupccl: add missing context cancel checks in gen split scatter processor r=rhu713 a=rhu713

Add the rest of the missing context cancel checks in restore's generativeSplitAndScatterProcessor. Add a red/green test to show that runGenerativeSplitAndScatter is interrupted if its supplied context is canceled.

Fixes: #95257

Release note: None

96796: sql: remove sql obs V22_2 gates r=maryliag a=maryliag

With the min version bumped to 22.2, it's safe
to delete version gates for 22.2.

Fixes #96758

Release note: None

Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Rui Hu <[email protected]>
Co-authored-by: maryliag <[email protected]>
  • Loading branch information
4 people committed Feb 8, 2023
4 parents 87d1fed + 548a4be + d91fc4b + 4993986 commit dbfc71b
Show file tree
Hide file tree
Showing 15 changed files with 225 additions and 294 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ go_test(
"create_scheduled_backup_test.go",
"datadriven_test.go",
"full_cluster_backup_restore_test.go",
"generative_split_and_scatter_processor_test.go",
"key_rewriter_test.go",
"main_test.go",
"partitioned_backup_test.go",
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ func TestBackupRestoreJobTagAndLabel(t *testing.T) {

backupAndRestore(ctx, t, tc, []string{localFoo}, []string{localFoo}, numAccounts)

mu.Lock()
defer mu.Unlock()
require.True(t, found)
}

Expand Down
18 changes: 16 additions & 2 deletions pkg/ccl/backupccl/generative_split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,22 @@ func runGenerativeSplitAndScatter(
idx++
if len(chunk.entries) == int(spec.ChunkSize) {
chunk.splitKey = entry.Span.Key
restoreEntryChunksCh <- chunk
select {
case <-ctx.Done():
return ctx.Err()
case restoreEntryChunksCh <- chunk:
}
chunk = restoreEntryChunk{}
}
chunk.entries = append(chunk.entries, entry)
}

if len(chunk.entries) > 0 {
restoreEntryChunksCh <- chunk
select {
case <-ctx.Done():
return ctx.Err()
case restoreEntryChunksCh <- chunk:
}
}
return nil
})
Expand Down Expand Up @@ -421,6 +429,12 @@ func runGenerativeSplitAndScatter(
node: chunkDestination,
}

if restoreKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok {
if restoreKnobs.RunAfterSplitAndScatteringEntry != nil {
restoreKnobs.RunAfterSplitAndScatteringEntry(ctx)
}
}

select {
case <-ctx.Done():
return ctx.Err()
Expand Down
140 changes: 140 additions & 0 deletions pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2023 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package backupccl

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/stretchr/testify/require"
)

// TestRunGenerativeSplitAndScatterContextCancel verifies that
// runGenerativeSplitAndScatter can be interrupted by canceling the supplied
// context. This test would time out if the context cancellation does not
// interrupt the function.
func TestRunGenerativeSplitAndScatterContextCancel(t *testing.T) {
defer leaktest.AfterTest(t)()

const numAccounts = 1000
const localFoo = "nodelocal://0/foo"
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
tc, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, numAccounts,
InitManualReplication)
defer cleanupFn()

st := cluster.MakeTestingClusterSettings()
evalCtx := eval.MakeTestingEvalContext(st)

testDiskMonitor := execinfra.NewTestDiskMonitor(ctx, st)
defer testDiskMonitor.Stop(ctx)

// Set up the test so that the test context is canceled after the first entry
// has been processed by the generative split and scatterer.
s0 := tc.Server(0)
registry := tc.Server(0).JobRegistry().(*jobs.Registry)
execCfg := s0.ExecutorConfig().(sql.ExecutorConfig)
flowCtx := execinfra.FlowCtx{
Cfg: &execinfra.ServerConfig{
Settings: st,
DB: s0.InternalDB().(descs.DB),
JobRegistry: registry,
ExecutorConfig: &execCfg,
TestingKnobs: execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterSplitAndScatteringEntry: func(ctx context.Context) {
cancel()
},
},
},
},
EvalCtx: &evalCtx,
Mon: evalCtx.TestingMon,
DiskMonitor: testDiskMonitor,
NodeID: evalCtx.NodeID,
}

sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '1'`)
sqlDB.Exec(t, `BACKUP INTO $1`, localFoo)

backups := sqlDB.QueryStr(t, `SHOW BACKUPS IN $1`, localFoo)
require.Equal(t, 1, len(backups))
uri := localFoo + "/" + backups[0][0]

codec := keys.MakeSQLCodec(s0.RPCContext().TenantID)
backupTableDesc := desctestutils.TestingGetPublicTableDescriptor(tc.Servers[0].DB(), codec, "data", "bank")
backupStartKey := backupTableDesc.PrimaryIndexSpan(codec).Key

spec := makeTestingGenerativeSplitAndScatterSpec(
[]string{uri},
[]roachpb.Span{{
Key: backupStartKey,
EndKey: backupStartKey.PrefixEnd(),
}},
)

oldID := backupTableDesc.GetID()
newID := backupTableDesc.GetID() + 1
newDesc := protoutil.Clone(backupTableDesc.TableDesc()).(*descpb.TableDescriptor)
newDesc.ID = newID
tableRekeys := []execinfrapb.TableRekey{
{
OldID: uint32(oldID),
NewDesc: mustMarshalDesc(t, newDesc),
},
}

kr, err := MakeKeyRewriterFromRekeys(keys.SystemSQLCodec, tableRekeys, nil, false)
require.NoError(t, err)

chunkSplitScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB.KV(), kr)}
chunkEntrySpliterScatterers := []splitAndScatterer{makeSplitAndScatterer(flowCtx.Cfg.DB.KV(), kr)}

// Large enough so doneScatterCh never blocks.
doneScatterCh := make(chan entryNode, 1000)
err = runGenerativeSplitAndScatter(ctx, &flowCtx, &spec, chunkSplitScatterers, chunkEntrySpliterScatterers, doneScatterCh)

require.Error(t, err, "context canceled")
}

func makeTestingGenerativeSplitAndScatterSpec(
backupURIs []string, requiredSpans []roachpb.Span,
) execinfrapb.GenerativeSplitAndScatterSpec {
return execinfrapb.GenerativeSplitAndScatterSpec{
ValidateOnly: false,
URIs: backupURIs,
Encryption: nil,
EndTime: hlc.Timestamp{},
Spans: requiredSpans,
BackupLocalityInfo: nil,
HighWater: nil,
UserProto: "",
ChunkSize: 1,
TargetSize: 1,
NumEntries: 1,
NumNodes: 1,
JobID: 0,
UseSimpleImportSpans: false,
}
}
7 changes: 0 additions & 7 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,6 @@ const (
TODODelete_V22_2AlterSystemSQLInstancesAddLocality
// TODODelete_V22_2SystemExternalConnectionsTable adds system.external_connections table.
TODODelete_V22_2SystemExternalConnectionsTable
// TODODelete_V22_2AlterSystemStatementStatisticsAddIndexRecommendations adds an
// index_recommendations column to the system.statement_statistics table.
TODODelete_V22_2AlterSystemStatementStatisticsAddIndexRecommendations
// TODODelete_V22_2RoleIDSequence is the version where the system.role_id_sequence exists.
TODODelete_V22_2RoleIDSequence
// TODODelete_V22_2AddSystemUserIDColumn is the version where the system.users table has
Expand Down Expand Up @@ -539,10 +536,6 @@ var rawVersionsSingleton = keyedVersions{
Key: TODODelete_V22_2SystemExternalConnectionsTable,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 30},
},
{
Key: TODODelete_V22_2AlterSystemStatementStatisticsAddIndexRecommendations,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 32},
},
{
Key: TODODelete_V22_2RoleIDSequence,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 34},
Expand Down
52 changes: 51 additions & 1 deletion pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,17 @@ func (r *Replica) leasePostApplyLocked(
requiresExpirationBasedLease := r.requiresExpiringLeaseRLocked()
hasExpirationBasedLease := newLease.Type() == roachpb.LeaseExpiration

now := r.store.Clock().NowAsClockTimestamp()

// NB: ProposedTS is non-nil in practice, but we never fully migrated it
// in so we need to assume that it can be nil.
if iAmTheLeaseHolder && leaseChangingHands && newLease.ProposedTS != nil {
maybeLogSlowLeaseApplyWarning(ctx, time.Duration(now.WallTime-newLease.ProposedTS.WallTime), prevLease, newLease)
}

// Gossip the first range whenever its lease is acquired. We check to make
// sure the lease is active so that a trailing replica won't process an old
// lease request and attempt to gossip the first range.
now := r.store.Clock().NowAsClockTimestamp()
if leaseChangingHands && iAmTheLeaseHolder && r.IsFirstRange() && r.ownsValidLeaseRLocked(ctx, now) {
r.gossipFirstRangeLocked(ctx)
}
Expand Down Expand Up @@ -444,6 +451,49 @@ func (r *Replica) leasePostApplyLocked(
}
}

// maybeLogSlowLeaseApplyWarning is called when the lease changes hands on the
// new leaseholder. It logs if either the new lease was proposed well before it
// became visible on the leaseholder (indicating replication lag) or if the
// previous lease looks like we transferred a lease to a behind/offline replica.
func maybeLogSlowLeaseApplyWarning(
ctx context.Context, newLeaseAppDelay time.Duration, prevLease, newLease *roachpb.Lease,
) {
const slowLeaseApplyWarnThreshold = 500 * time.Millisecond
if newLeaseAppDelay > slowLeaseApplyWarnThreshold {
// If we hold the lease now and the lease was proposed "earlier", there
// must have been replication lag, and possibly reads and/or writes were
// delayed.
//
// We see this most commonly with lease transfers targeting a behind replica,
// or, in the worst case, a snapshot. We are constantly improving our
// heuristics for avoiding that[^1] but if it does happen it's good to know
// from the logs.
//
// In the case of a lease transfer, the two timestamps compared below are from
// different clocks, so there could be skew. We just pretend this is not the
// case, which is good enough here.
//
// [^1]: https://github.com/cockroachdb/cockroach/pull/82758
log.Warningf(ctx,
"lease %v active after replication lag of ~%.2fs; foreground traffic may have been impacted [prev=%v]",
newLease, newLeaseAppDelay.Seconds(), prevLease,
)
} else if prevLease.Type() == roachpb.LeaseExpiration &&
newLease.Type() == roachpb.LeaseEpoch &&
newLease.AcquisitionType == roachpb.LeaseAcquisitionType_Request {
// If the previous lease is expiration-based, but the new lease is not and
// the acquisition was non-cooperative, it is likely that a lease transfer
// (which is expiration-based) went to a follower that then couldn't hold
// the lease alive (for example, didn't apply it in time for it to
// actually serve any traffic). The result was likely an outage which
// resolves right now, so log to point this out.
log.Warningf(ctx,
"lease %v expired before being followed by lease %s; foreground traffic may have been impacted",
prevLease, newLease,
)
}
}

var addSSTPreApplyWarn = struct {
threshold time.Duration
log.EveryN
Expand Down
21 changes: 1 addition & 20 deletions pkg/server/combined_statement_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -725,23 +724,6 @@ func getStatementDetailsPerPlanHash(

query := fmt.Sprintf(
`SELECT
plan_hash,
(statistics -> 'statistics' -> 'planGists'->>0) as plan_gist,
crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics,
max(sampled_plan) as sampled_plan,
aggregation_interval
FROM crdb_internal.statement_statistics %s
GROUP BY
plan_hash,
plan_gist,
aggregation_interval
LIMIT $%d`, whereClause, len(args)+1)
expectedNumDatums := 6

if settings.Version.IsActive(ctx, clusterversion.TODODelete_V22_2AlterSystemStatementStatisticsAddIndexRecommendations) {
query = fmt.Sprintf(
`SELECT
plan_hash,
(statistics -> 'statistics' -> 'planGists'->>0) as plan_gist,
crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata,
Expand All @@ -756,8 +738,7 @@ func getStatementDetailsPerPlanHash(
aggregation_interval,
index_recommendations
LIMIT $%d`, whereClause, len(args)+1)
expectedNumDatums = 7
}
expectedNumDatums := 7

args = append(args, limit)

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1632,6 +1632,10 @@ type BackupRestoreTestingKnobs struct {
// execution.
CaptureResolvedTableDescSpans func([]roachpb.Span)

// RunAfterSplitAndScatteringEntry allows blocking the RESTORE job after a
// single RestoreSpanEntry has been split and scattered.
RunAfterSplitAndScatteringEntry func(ctx context.Context)

// RunAfterProcessingRestoreSpanEntry allows blocking the RESTORE job after a
// single RestoreSpanEntry has been processed and added to the SSTBatcher.
RunAfterProcessingRestoreSpanEntry func(ctx context.Context)
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/scheduledjobs",
Expand Down
21 changes: 9 additions & 12 deletions pkg/sql/sqlstats/persistedsqlstats/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -487,7 +486,14 @@ func (s *PersistedSQLStats) insertStatementStats(
plan := tree.NewDJSON(sqlstatsutil.ExplainTreePlanNodeToJSON(&stats.Stats.SensitiveInfo.MostRecentPlanDescription))
nodeID := s.GetEnabledSQLInstanceID()

values := "$1 ,$2, $3, $4, $5, $6, $7, $8, $9, $10"
indexRecommendations := tree.NewDArray(types.String)
for _, recommendation := range stats.Stats.IndexRecommendations {
if err := indexRecommendations.Append(tree.NewDString(recommendation)); err != nil {
return 0, err
}
}

values := "$1 ,$2, $3, $4, $5, $6, $7, $8, $9, $10, $11"
args := append(make([]interface{}, 0, 11),
aggregatedTs, // aggregated_ts
serializedFingerprintID, // fingerprint_id
Expand All @@ -499,17 +505,8 @@ func (s *PersistedSQLStats) insertStatementStats(
metadata, // metadata
statistics, // statistics
plan, // plan
indexRecommendations, // index_recommendations
)
if s.cfg.Settings.Version.IsActive(ctx, clusterversion.TODODelete_V22_2AlterSystemStatementStatisticsAddIndexRecommendations) {
values = values + ", $11"
indexRecommendations := tree.NewDArray(types.String)
for _, recommendation := range stats.Stats.IndexRecommendations {
if err := indexRecommendations.Append(tree.NewDString(recommendation)); err != nil {
return 0, err
}
}
args = append(args, indexRecommendations)
}

insertStmt := fmt.Sprintf(`
INSERT INTO system.statement_statistics
Expand Down
Loading

0 comments on commit dbfc71b

Please sign in to comment.