From 2afef14e6717499c16ed0145a5f8fcc370ed1433 Mon Sep 17 00:00:00 2001 From: Andrii Vorobiov Date: Thu, 14 Dec 2023 16:25:02 +0200 Subject: [PATCH] sql: atomic flushing of sql stats This change refactors existing logic of flushing SQL stats. The main goal of this change is to fix an issue where flushing and clearing in-memory stats aren't "atomic" operation and may cause cases where not yet persisted stats are cleared unintentionally. Current change introduces following changes: 1. Introduces `PopAllStatementsStats` function that prepares stats to be persisted and then clears in-memory stats as an atomic operation. 2. the process of flushing stats is following: - pop all stats from in-memory storage - reset in-memory stats - use local copy of stats and persist them 3. before this process was like this: - iterate in-memory stats which could be updated during iteration; - persisting stats could take some time and iteration over stats slow; - after flushing all stats, in-memory storage is cleared, but there's no guaranties that at this moment nothing is added to SQL stats. New implementation does have disadvantage, it might cause glitches when we pop stats from in-memory storage and before persisting them - user might not see up to date information. It is assumed that this is better than having missing statistics permanently. Release note: None --- pkg/sql/sqlstats/persistedsqlstats/flush.go | 80 ++++++------------- pkg/sql/sqlstats/sslocal/sql_stats.go | 50 +++++++----- pkg/sql/sqlstats/sslocal/sslocal_provider.go | 45 +++++++++++ .../sqlstats/ssmemstorage/ss_mem_storage.go | 66 +++++++++++++++ 4 files changed, 164 insertions(+), 77 deletions(-) diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush.go b/pkg/sql/sqlstats/persistedsqlstats/flush.go index d14bfadc6b5a..635946c80fa5 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush.go @@ -14,7 +14,6 @@ import ( "context" "fmt" "math/rand" - "sync" "time" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" @@ -22,7 +21,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" - "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -41,15 +39,10 @@ func (s *PersistedSQLStats) Flush(ctx context.Context) { enabled := SQLStatsFlushEnabled.Get(&s.cfg.Settings.SV) flushingTooSoon := now.Before(s.lastFlushStarted.Add(minimumFlushInterval)) - // Handle wiping in-memory stats here, we only wipe in-memory stats under 2 - // circumstances: - // 1. flush is enabled, and we are not early aborting the flush due to flushing - // too frequently. - // 2. flush is disabled, but we allow discard in-memory stats when disabled. - shouldWipeInMemoryStats := enabled && !flushingTooSoon - shouldWipeInMemoryStats = shouldWipeInMemoryStats || (!enabled && allowDiscardWhenDisabled) - - if shouldWipeInMemoryStats { + // Reset stats is performed individually for statement and transaction stats + // within SQLStats.ConsumeStats function. Here, we reset stats only when + // sql stats flush is disabled. + if !enabled && allowDiscardWhenDisabled { defer func() { if err := s.SQLStats.Reset(ctx); err != nil { log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err) @@ -88,20 +81,25 @@ func (s *PersistedSQLStats) Flush(ctx context.Context) { if limitReached { log.Infof(ctx, "unable to flush fingerprints because table limit was reached.") } else { - var wg sync.WaitGroup - wg.Add(2) - - go func() { - defer wg.Done() - s.flushStmtStats(ctx, aggregatedTs) - }() - - go func() { - defer wg.Done() - s.flushTxnStats(ctx, aggregatedTs) - }() - - wg.Wait() + s.SQLStats.ConsumeStats(ctx, + func(ctx context.Context, statistics *appstatspb.CollectedStatementStatistics) error { + s.doFlush(ctx, func() error { + return s.doFlushSingleStmtStats(ctx, statistics, aggregatedTs) + }, "failed to flush statement statistics" /* errMsg */) + + return nil + }, + func(ctx context.Context, statistics *appstatspb.CollectedTransactionStatistics) error { + s.doFlush(ctx, func() error { + return s.doFlushSingleTxnStats(ctx, statistics, aggregatedTs) + }, "failed to flush transaction statistics" /* errMsg */) + + return nil + }) + + if s.cfg.Knobs != nil && s.cfg.Knobs.OnStmtStatsFlushFinished != nil { + s.cfg.Knobs.OnStmtStatsFlushFinished() + } } } @@ -153,38 +151,6 @@ func (s *PersistedSQLStats) StmtsLimitSizeReached(ctx context.Context) (bool, er return isSizeLimitReached, nil } -func (s *PersistedSQLStats) flushStmtStats(ctx context.Context, aggregatedTs time.Time) { - // s.doFlush directly logs errors if they are encountered. Therefore, - // no error is returned here. - _ = s.SQLStats.IterateStatementStats(ctx, sqlstats.IteratorOptions{}, - func(ctx context.Context, statistics *appstatspb.CollectedStatementStatistics) error { - s.doFlush(ctx, func() error { - return s.doFlushSingleStmtStats(ctx, statistics, aggregatedTs) - }, "failed to flush statement statistics" /* errMsg */) - - return nil - }) - - if s.cfg.Knobs != nil && s.cfg.Knobs.OnStmtStatsFlushFinished != nil { - s.cfg.Knobs.OnStmtStatsFlushFinished() - } -} - -func (s *PersistedSQLStats) flushTxnStats(ctx context.Context, aggregatedTs time.Time) { - _ = s.SQLStats.IterateTransactionStats(ctx, sqlstats.IteratorOptions{}, - func(ctx context.Context, statistics *appstatspb.CollectedTransactionStatistics) error { - s.doFlush(ctx, func() error { - return s.doFlushSingleTxnStats(ctx, statistics, aggregatedTs) - }, "failed to flush transaction statistics" /* errMsg */) - - return nil - }) - - if s.cfg.Knobs != nil && s.cfg.Knobs.OnTxnStatsFlushFinished != nil { - s.cfg.Knobs.OnTxnStatsFlushFinished() - } -} - func (s *PersistedSQLStats) doFlush(ctx context.Context, workFn func() error, errMsg string) { var err error flushBegin := s.getTimeNow() diff --git a/pkg/sql/sqlstats/sslocal/sql_stats.go b/pkg/sql/sqlstats/sslocal/sql_stats.go index 210643ff4da4..e960b6c51820 100644 --- a/pkg/sql/sqlstats/sslocal/sql_stats.go +++ b/pkg/sql/sqlstats/sslocal/sql_stats.go @@ -151,33 +151,43 @@ func (s *SQLStats) resetAndMaybeDumpStats(ctx context.Context, target Sink) (err // accumulate data using that until it closes (or changes its // application_name). for appName, statsContainer := range s.mu.apps { - // Save the existing data to logs. - // TODO(knz/dt): instead of dumping the stats to the log, save - // them in a SQL table so they can be inspected by the DBA and/or - // the UI. - if sqlstats.DumpStmtStatsToLogBeforeReset.Get(&s.st.SV) { - statsContainer.SaveToLog(ctx, appName) - } - - if target != nil { - lastErr := target.AddAppStats(ctx, appName, statsContainer) - // If we run out of memory budget, Container.Add() will merge stats in - // statsContainer with all the existing stats. However it will discard - // rest of the stats in statsContainer that requires memory allocation. - // We do not wish to short circuit here because we want to still try our - // best to merge all the stats that we can. - if lastErr != nil { - err = lastErr - } - } - + lastErr := s.MaybeDumpStatsToLog(ctx, appName, statsContainer, target) statsContainer.Clear(ctx) + err = lastErr } s.mu.lastReset = timeutil.Now() return err } +// MaybeDumpStatsToLog flushes stats into target If it is not nil. +func (s *SQLStats) MaybeDumpStatsToLog( + ctx context.Context, + appName string, + container *ssmemstorage.Container, + target Sink) (err error) { + // Save the existing data to logs. + // TODO(knz/dt): instead of dumping the stats to the log, save + // them in a SQL table so they can be inspected by the DBA and/or + // the UI. + if sqlstats.DumpStmtStatsToLogBeforeReset.Get(&s.st.SV) { + container.SaveToLog(ctx, appName) + } + + if target != nil { + lastErr := target.AddAppStats(ctx, appName, container) + // If we run out of memory budget, Container.Add() will merge stats in + // statsContainer with all the existing stats. However it will discard + // rest of the stats in statsContainer that requires memory allocation. + // We do not wish to short circuit here because we want to still try our + // best to merge all the stats that we can. + if lastErr != nil { + err = lastErr + } + } + return err +} + func (s *SQLStats) GetClusterSettings() *cluster.Settings { return s.st } diff --git a/pkg/sql/sqlstats/sslocal/sslocal_provider.go b/pkg/sql/sqlstats/sslocal/sslocal_provider.go index d9535c90f0e2..7975c2eef2dd 100644 --- a/pkg/sql/sqlstats/sslocal/sslocal_provider.go +++ b/pkg/sql/sqlstats/sslocal/sslocal_provider.go @@ -13,6 +13,7 @@ package sslocal import ( "context" "sort" + "sync" "time" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -140,6 +141,50 @@ func (s *SQLStats) IterateStatementStats( return nil } +// ConsumeStats leverages the process of retrieving stats from in-memory storage and then iterating over them +// calling stmtVisitor and txnVisitor on statement and transaction stats respectively. +func (s *SQLStats) ConsumeStats(ctx context.Context, stmtVisitor sqlstats.StatementVisitor, txnVisitor sqlstats.TransactionVisitor) { + apps := s.getAppNames(false) + for _, app := range apps { + app := app + container := s.GetApplicationStats(app, true).(*ssmemstorage.Container) + if err := s.MaybeDumpStatsToLog(ctx, app, container, s.flushTarget); err != nil { + log.Warningf(ctx, "failed to dump stats to log, %s", err.Error()) + } + stmtStats := container.PopAllStatementsStats() + txnStats := container.PopAllTransactionStats() + container.Free(ctx) + + // Iterate over collected stats that have been already cleared from in-memory stats and persist them + // the system statement|transaction_statistics tables. + // In-memory stats storage is not locked here and it is safe to call stmtVisitor or txnVisitor functions + // that might be time consuming operations. + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + for _, stat := range stmtStats { + stat := stat + if err := stmtVisitor(ctx, stat); err != nil { + log.Warningf(ctx, "failed to consume statement statistics, %s", err.Error()) + } + } + }() + + go func() { + defer wg.Done() + for _, stat := range txnStats { + stat := stat + if err := txnVisitor(ctx, stat); err != nil { + log.Warningf(ctx, "failed to consume transaction statistics, %s", err.Error()) + } + } + }() + wg.Wait() + } +} + // StmtStatsIterator returns an instance of sslocal.StmtStatsIterator for // the current SQLStats. func (s *SQLStats) StmtStatsIterator(options sqlstats.IteratorOptions) StmtStatsIterator { diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go index 8980a7d5d274..4092a684c879 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go @@ -594,6 +594,72 @@ func (s *Container) SaveToLog(ctx context.Context, appName string) { log.Infof(ctx, "statistics for %q:\n%s", appName, buf.String()) } +// PopAllStatementsStats returns all collected statement stats in memory to the caller and clears SQL stats +// make sure that new arriving stats won't be interfering with existing one. +func (s *Container) PopAllStatementsStats() []*appstatspb.CollectedStatementStatistics { + s.mu.Lock() + defer s.mu.Unlock() + stats := make([]*appstatspb.CollectedStatementStatistics, 0) + if len(s.mu.stmts) == 0 { + return stats + } + for key, stmt := range s.mu.stmts { + stmt.mu.Lock() + data := stmt.mu.data + distSQLUsed := stmt.mu.distSQLUsed + vectorized := stmt.mu.vectorized + fullScan := stmt.mu.fullScan + database := stmt.mu.database + querySummary := stmt.mu.querySummary + stmt.mu.Unlock() + + stats = append(stats, &appstatspb.CollectedStatementStatistics{ + Key: appstatspb.StatementStatisticsKey{ + Query: key.stmtNoConstants, + QuerySummary: querySummary, + DistSQL: distSQLUsed, + Vec: vectorized, + ImplicitTxn: key.implicitTxn, + FullScan: fullScan, + Failed: key.failed, + App: s.appName, + Database: database, + PlanHash: key.planHash, + TransactionFingerprintID: key.transactionFingerprintID, + }, + ID: constructStatementFingerprintIDFromStmtKey(key), + Stats: data, + }) + } + // Reset stats after they're collected. s.Reset is not called here to a) ensure that only statement + // stats are cleared b) it is cleared under the same lock so there's no chance that new stats are added. + s.mu.stmts = make(map[stmtKey]*stmtStats, len(s.mu.stmts)/2) + s.mu.sampledPlanMetadataCache = make(map[sampledPlanKey]time.Time, len(s.mu.sampledPlanMetadataCache)/2) + return stats +} + +// PopAllTransactionStats returns all collected transaction stats in memory to the caller and clears SQL stats +// make sure that new arriving stats won't be interfering with existing one. +func (t *Container) PopAllTransactionStats() []*appstatspb.CollectedTransactionStatistics { + t.mu.Lock() + defer t.mu.Unlock() + stats := make([]*appstatspb.CollectedTransactionStatistics, 0) + if len(t.mu.txns) == 0 { + return stats + } + for key := range t.mu.txns { + txnStats, _, _ := t.getStatsForTxnWithKeyLocked(key, nil /* stmtFingerprintIDs */, false /* createIfNonexistent */) + stats = append(stats, &appstatspb.CollectedTransactionStatistics{ + StatementFingerprintIDs: txnStats.statementFingerprintIDs, + App: t.appName, + Stats: txnStats.mu.data, + TransactionFingerprintID: key, + }) + } + t.mu.txns = make(map[appstatspb.TransactionFingerprintID]*txnStats, len(t.mu.txns)/2) + return stats +} + // Clear clears the data stored in this Container and prepare the Container // for reuse. func (s *Container) Clear(ctx context.Context) {