Skip to content

Commit

Permalink
sql: atomic flushing of sql stats
Browse files Browse the repository at this point in the history
This change refactors existing logic of flushing SQL stats. The main goal
of this change is to fix an issue where flushing and clearing in-memory
stats aren't "atomic" operation and may cause cases where not yet
persisted stats are cleared unintentionally.

Current change introduces following changes:
1. Introduces `PopAllStatementsStats` function that prepares stats to
be persisted and then clears in-memory stats as an atomic operation.
2. the process of flushing stats is following:
    - pop all stats from in-memory storage
    - reset in-memory stats
    - use local copy of stats and persist them
3. before this process was like this:
    - iterate in-memory stats which could be updated during iteration;
    - persisting stats could take some time and iteration over stats slow;
    - after flushing all stats, in-memory storage is cleared, but there's no
      guaranties that at this moment nothing is added to SQL stats.

New implementation does have disadvantage, it might cause glitches when we pop
stats from in-memory storage and before persisting them - user might not see
up to date information. It is assumed that this is better than having missing
statistics permanently.

Release note: None
  • Loading branch information
koorosh committed Dec 14, 2023
1 parent 3771f53 commit 478a1a3
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 77 deletions.
80 changes: 23 additions & 57 deletions pkg/sql/sqlstats/persistedsqlstats/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@ import (
"context"
"fmt"
"math/rand"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -41,15 +39,10 @@ func (s *PersistedSQLStats) Flush(ctx context.Context) {
enabled := SQLStatsFlushEnabled.Get(&s.cfg.Settings.SV)
flushingTooSoon := now.Before(s.lastFlushStarted.Add(minimumFlushInterval))

// Handle wiping in-memory stats here, we only wipe in-memory stats under 2
// circumstances:
// 1. flush is enabled, and we are not early aborting the flush due to flushing
// too frequently.
// 2. flush is disabled, but we allow discard in-memory stats when disabled.
shouldWipeInMemoryStats := enabled && !flushingTooSoon
shouldWipeInMemoryStats = shouldWipeInMemoryStats || (!enabled && allowDiscardWhenDisabled)

if shouldWipeInMemoryStats {
// Reset stats is performed individually for statement and transaction stats
// within SQLStats.ConsumeStats function. Here, we reset stats only when
// sql stats flush is disabled.
if !enabled && allowDiscardWhenDisabled {
defer func() {
if err := s.SQLStats.Reset(ctx); err != nil {
log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err)
Expand Down Expand Up @@ -88,20 +81,25 @@ func (s *PersistedSQLStats) Flush(ctx context.Context) {
if limitReached {
log.Infof(ctx, "unable to flush fingerprints because table limit was reached.")
} else {
var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
s.flushStmtStats(ctx, aggregatedTs)
}()

go func() {
defer wg.Done()
s.flushTxnStats(ctx, aggregatedTs)
}()

wg.Wait()
s.SQLStats.ConsumeStats(ctx,
func(ctx context.Context, statistics *appstatspb.CollectedStatementStatistics) error {
s.doFlush(ctx, func() error {
return s.doFlushSingleStmtStats(ctx, statistics, aggregatedTs)
}, "failed to flush statement statistics" /* errMsg */)

return nil
},
func(ctx context.Context, statistics *appstatspb.CollectedTransactionStatistics) error {
s.doFlush(ctx, func() error {
return s.doFlushSingleTxnStats(ctx, statistics, aggregatedTs)
}, "failed to flush transaction statistics" /* errMsg */)

return nil
})

if s.cfg.Knobs != nil && s.cfg.Knobs.OnStmtStatsFlushFinished != nil {
s.cfg.Knobs.OnStmtStatsFlushFinished()
}
}
}

Expand Down Expand Up @@ -153,38 +151,6 @@ func (s *PersistedSQLStats) StmtsLimitSizeReached(ctx context.Context) (bool, er
return isSizeLimitReached, nil
}

func (s *PersistedSQLStats) flushStmtStats(ctx context.Context, aggregatedTs time.Time) {
// s.doFlush directly logs errors if they are encountered. Therefore,
// no error is returned here.
_ = s.SQLStats.IterateStatementStats(ctx, sqlstats.IteratorOptions{},
func(ctx context.Context, statistics *appstatspb.CollectedStatementStatistics) error {
s.doFlush(ctx, func() error {
return s.doFlushSingleStmtStats(ctx, statistics, aggregatedTs)
}, "failed to flush statement statistics" /* errMsg */)

return nil
})

if s.cfg.Knobs != nil && s.cfg.Knobs.OnStmtStatsFlushFinished != nil {
s.cfg.Knobs.OnStmtStatsFlushFinished()
}
}

func (s *PersistedSQLStats) flushTxnStats(ctx context.Context, aggregatedTs time.Time) {
_ = s.SQLStats.IterateTransactionStats(ctx, sqlstats.IteratorOptions{},
func(ctx context.Context, statistics *appstatspb.CollectedTransactionStatistics) error {
s.doFlush(ctx, func() error {
return s.doFlushSingleTxnStats(ctx, statistics, aggregatedTs)
}, "failed to flush transaction statistics" /* errMsg */)

return nil
})

if s.cfg.Knobs != nil && s.cfg.Knobs.OnTxnStatsFlushFinished != nil {
s.cfg.Knobs.OnTxnStatsFlushFinished()
}
}

func (s *PersistedSQLStats) doFlush(ctx context.Context, workFn func() error, errMsg string) {
var err error
flushBegin := s.getTimeNow()
Expand Down
48 changes: 28 additions & 20 deletions pkg/sql/sqlstats/sslocal/sql_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,33 +151,41 @@ func (s *SQLStats) resetAndMaybeDumpStats(ctx context.Context, target Sink) (err
// accumulate data using that until it closes (or changes its
// application_name).
for appName, statsContainer := range s.mu.apps {
// Save the existing data to logs.
// TODO(knz/dt): instead of dumping the stats to the log, save
// them in a SQL table so they can be inspected by the DBA and/or
// the UI.
if sqlstats.DumpStmtStatsToLogBeforeReset.Get(&s.st.SV) {
statsContainer.SaveToLog(ctx, appName)
}

if target != nil {
lastErr := target.AddAppStats(ctx, appName, statsContainer)
// If we run out of memory budget, Container.Add() will merge stats in
// statsContainer with all the existing stats. However it will discard
// rest of the stats in statsContainer that requires memory allocation.
// We do not wish to short circuit here because we want to still try our
// best to merge all the stats that we can.
if lastErr != nil {
err = lastErr
}
}

lastErr := s.MaybeDumpStatsToLog(ctx, appName, statsContainer, target)
statsContainer.Clear(ctx)
err = lastErr
}
s.mu.lastReset = timeutil.Now()

return err
}

// MaybeDumpStatsToLog flushes stats into target If it is not nil.
func (s *SQLStats) MaybeDumpStatsToLog(
ctx context.Context, appName string, container *ssmemstorage.Container, target Sink
) (err error) {
// Save the existing data to logs.
// TODO(knz/dt): instead of dumping the stats to the log, save
// them in a SQL table so they can be inspected by the DBA and/or
// the UI.
if sqlstats.DumpStmtStatsToLogBeforeReset.Get(&s.st.SV) {
container.SaveToLog(ctx, appName)
}

if target != nil {
lastErr := target.AddAppStats(ctx, appName, container)
// If we run out of memory budget, Container.Add() will merge stats in
// statsContainer with all the existing stats. However it will discard
// rest of the stats in statsContainer that requires memory allocation.
// We do not wish to short circuit here because we want to still try our
// best to merge all the stats that we can.
if lastErr != nil {
err = lastErr
}
}
return err
}

func (s *SQLStats) GetClusterSettings() *cluster.Settings {
return s.st
}
49 changes: 49 additions & 0 deletions pkg/sql/sqlstats/sslocal/sslocal_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package sslocal
import (
"context"
"sort"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/server/serverpb"
Expand Down Expand Up @@ -140,6 +141,54 @@ func (s *SQLStats) IterateStatementStats(
return nil
}

// ConsumeStats leverages the process of retrieving stats from in-memory storage and then iterating over them
// calling stmtVisitor and txnVisitor on statement and transaction stats respectively.
func (s *SQLStats) ConsumeStats(
ctx context.Context,
stmtVisitor sqlstats.StatementVisitor,
txnVisitor sqlstats.TransactionVisitor,
) {
apps := s.getAppNames(false)
for _, app := range apps {
app := app
container := s.GetApplicationStats(app, true).(*ssmemstorage.Container)
if err := s.MaybeDumpStatsToLog(ctx, app, container, s.flushTarget); err != nil {
log.Warningf(ctx, "failed to dump stats to log, %s", err.Error())
}
stmtStats := container.PopAllStatementsStats()
txnStats := container.PopAllTransactionStats()
container.Free(ctx)

// Iterate over collected stats that have been already cleared from in-memory stats and persist them
// the system statement|transaction_statistics tables.
// In-memory stats storage is not locked here and it is safe to call stmtVisitor or txnVisitor functions
// that might be time consuming operations.
var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
for _, stat := range stmtStats {
stat := stat
if err := stmtVisitor(ctx, stat); err != nil {
log.Warningf(ctx, "failed to consume statement statistics, %s", err.Error())
}
}
}()

go func() {
defer wg.Done()
for _, stat := range txnStats {
stat := stat
if err := txnVisitor(ctx, stat); err != nil {
log.Warningf(ctx, "failed to consume transaction statistics, %s", err.Error())
}
}
}()
wg.Wait()
}
}

// StmtStatsIterator returns an instance of sslocal.StmtStatsIterator for
// the current SQLStats.
func (s *SQLStats) StmtStatsIterator(options sqlstats.IteratorOptions) StmtStatsIterator {
Expand Down
66 changes: 66 additions & 0 deletions pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,72 @@ func (s *Container) SaveToLog(ctx context.Context, appName string) {
log.Infof(ctx, "statistics for %q:\n%s", appName, buf.String())
}

// PopAllStatementsStats returns all collected statement stats in memory to the caller and clears SQL stats
// make sure that new arriving stats won't be interfering with existing one.
func (s *Container) PopAllStatementsStats() []*appstatspb.CollectedStatementStatistics {
s.mu.Lock()
defer s.mu.Unlock()
stats := make([]*appstatspb.CollectedStatementStatistics, 0)
if len(s.mu.stmts) == 0 {
return stats
}
for key, stmt := range s.mu.stmts {
stmt.mu.Lock()
data := stmt.mu.data
distSQLUsed := stmt.mu.distSQLUsed
vectorized := stmt.mu.vectorized
fullScan := stmt.mu.fullScan
database := stmt.mu.database
querySummary := stmt.mu.querySummary
stmt.mu.Unlock()

stats = append(stats, &appstatspb.CollectedStatementStatistics{
Key: appstatspb.StatementStatisticsKey{
Query: key.stmtNoConstants,
QuerySummary: querySummary,
DistSQL: distSQLUsed,
Vec: vectorized,
ImplicitTxn: key.implicitTxn,
FullScan: fullScan,
Failed: key.failed,
App: s.appName,
Database: database,
PlanHash: key.planHash,
TransactionFingerprintID: key.transactionFingerprintID,
},
ID: constructStatementFingerprintIDFromStmtKey(key),
Stats: data,
})
}
// Reset stats after they're collected. s.Reset is not called here to a) ensure that only statement
// stats are cleared b) it is cleared under the same lock so there's no chance that new stats are added.
s.mu.stmts = make(map[stmtKey]*stmtStats, len(s.mu.stmts)/2)
s.mu.sampledPlanMetadataCache = make(map[sampledPlanKey]time.Time, len(s.mu.sampledPlanMetadataCache)/2)
return stats
}

// PopAllTransactionStats returns all collected transaction stats in memory to the caller and clears SQL stats
// make sure that new arriving stats won't be interfering with existing one.
func (t *Container) PopAllTransactionStats() []*appstatspb.CollectedTransactionStatistics {
t.mu.Lock()
defer t.mu.Unlock()
stats := make([]*appstatspb.CollectedTransactionStatistics, 0)
if len(t.mu.txns) == 0 {
return stats
}
for key := range t.mu.txns {
txnStats, _, _ := t.getStatsForTxnWithKeyLocked(key, nil /* stmtFingerprintIDs */, false /* createIfNonexistent */)
stats = append(stats, &appstatspb.CollectedTransactionStatistics{
StatementFingerprintIDs: txnStats.statementFingerprintIDs,
App: t.appName,
Stats: txnStats.mu.data,
TransactionFingerprintID: key,
})
}
t.mu.txns = make(map[appstatspb.TransactionFingerprintID]*txnStats, len(t.mu.txns)/2)
return stats
}

// Clear clears the data stored in this Container and prepare the Container
// for reuse.
func (s *Container) Clear(ctx context.Context) {
Expand Down

0 comments on commit 478a1a3

Please sign in to comment.