diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush.go b/pkg/sql/sqlstats/persistedsqlstats/flush.go index d14bfadc6b5a..635946c80fa5 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush.go @@ -14,7 +14,6 @@ import ( "context" "fmt" "math/rand" - "sync" "time" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" @@ -22,7 +21,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" - "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -41,15 +39,10 @@ func (s *PersistedSQLStats) Flush(ctx context.Context) { enabled := SQLStatsFlushEnabled.Get(&s.cfg.Settings.SV) flushingTooSoon := now.Before(s.lastFlushStarted.Add(minimumFlushInterval)) - // Handle wiping in-memory stats here, we only wipe in-memory stats under 2 - // circumstances: - // 1. flush is enabled, and we are not early aborting the flush due to flushing - // too frequently. - // 2. flush is disabled, but we allow discard in-memory stats when disabled. - shouldWipeInMemoryStats := enabled && !flushingTooSoon - shouldWipeInMemoryStats = shouldWipeInMemoryStats || (!enabled && allowDiscardWhenDisabled) - - if shouldWipeInMemoryStats { + // Reset stats is performed individually for statement and transaction stats + // within SQLStats.ConsumeStats function. Here, we reset stats only when + // sql stats flush is disabled. + if !enabled && allowDiscardWhenDisabled { defer func() { if err := s.SQLStats.Reset(ctx); err != nil { log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err) @@ -88,20 +81,25 @@ func (s *PersistedSQLStats) Flush(ctx context.Context) { if limitReached { log.Infof(ctx, "unable to flush fingerprints because table limit was reached.") } else { - var wg sync.WaitGroup - wg.Add(2) - - go func() { - defer wg.Done() - s.flushStmtStats(ctx, aggregatedTs) - }() - - go func() { - defer wg.Done() - s.flushTxnStats(ctx, aggregatedTs) - }() - - wg.Wait() + s.SQLStats.ConsumeStats(ctx, + func(ctx context.Context, statistics *appstatspb.CollectedStatementStatistics) error { + s.doFlush(ctx, func() error { + return s.doFlushSingleStmtStats(ctx, statistics, aggregatedTs) + }, "failed to flush statement statistics" /* errMsg */) + + return nil + }, + func(ctx context.Context, statistics *appstatspb.CollectedTransactionStatistics) error { + s.doFlush(ctx, func() error { + return s.doFlushSingleTxnStats(ctx, statistics, aggregatedTs) + }, "failed to flush transaction statistics" /* errMsg */) + + return nil + }) + + if s.cfg.Knobs != nil && s.cfg.Knobs.OnStmtStatsFlushFinished != nil { + s.cfg.Knobs.OnStmtStatsFlushFinished() + } } } @@ -153,38 +151,6 @@ func (s *PersistedSQLStats) StmtsLimitSizeReached(ctx context.Context) (bool, er return isSizeLimitReached, nil } -func (s *PersistedSQLStats) flushStmtStats(ctx context.Context, aggregatedTs time.Time) { - // s.doFlush directly logs errors if they are encountered. Therefore, - // no error is returned here. - _ = s.SQLStats.IterateStatementStats(ctx, sqlstats.IteratorOptions{}, - func(ctx context.Context, statistics *appstatspb.CollectedStatementStatistics) error { - s.doFlush(ctx, func() error { - return s.doFlushSingleStmtStats(ctx, statistics, aggregatedTs) - }, "failed to flush statement statistics" /* errMsg */) - - return nil - }) - - if s.cfg.Knobs != nil && s.cfg.Knobs.OnStmtStatsFlushFinished != nil { - s.cfg.Knobs.OnStmtStatsFlushFinished() - } -} - -func (s *PersistedSQLStats) flushTxnStats(ctx context.Context, aggregatedTs time.Time) { - _ = s.SQLStats.IterateTransactionStats(ctx, sqlstats.IteratorOptions{}, - func(ctx context.Context, statistics *appstatspb.CollectedTransactionStatistics) error { - s.doFlush(ctx, func() error { - return s.doFlushSingleTxnStats(ctx, statistics, aggregatedTs) - }, "failed to flush transaction statistics" /* errMsg */) - - return nil - }) - - if s.cfg.Knobs != nil && s.cfg.Knobs.OnTxnStatsFlushFinished != nil { - s.cfg.Knobs.OnTxnStatsFlushFinished() - } -} - func (s *PersistedSQLStats) doFlush(ctx context.Context, workFn func() error, errMsg string) { var err error flushBegin := s.getTimeNow() diff --git a/pkg/sql/sqlstats/sslocal/sql_stats.go b/pkg/sql/sqlstats/sslocal/sql_stats.go index 210643ff4da4..e960b6c51820 100644 --- a/pkg/sql/sqlstats/sslocal/sql_stats.go +++ b/pkg/sql/sqlstats/sslocal/sql_stats.go @@ -151,33 +151,43 @@ func (s *SQLStats) resetAndMaybeDumpStats(ctx context.Context, target Sink) (err // accumulate data using that until it closes (or changes its // application_name). for appName, statsContainer := range s.mu.apps { - // Save the existing data to logs. - // TODO(knz/dt): instead of dumping the stats to the log, save - // them in a SQL table so they can be inspected by the DBA and/or - // the UI. - if sqlstats.DumpStmtStatsToLogBeforeReset.Get(&s.st.SV) { - statsContainer.SaveToLog(ctx, appName) - } - - if target != nil { - lastErr := target.AddAppStats(ctx, appName, statsContainer) - // If we run out of memory budget, Container.Add() will merge stats in - // statsContainer with all the existing stats. However it will discard - // rest of the stats in statsContainer that requires memory allocation. - // We do not wish to short circuit here because we want to still try our - // best to merge all the stats that we can. - if lastErr != nil { - err = lastErr - } - } - + lastErr := s.MaybeDumpStatsToLog(ctx, appName, statsContainer, target) statsContainer.Clear(ctx) + err = lastErr } s.mu.lastReset = timeutil.Now() return err } +// MaybeDumpStatsToLog flushes stats into target If it is not nil. +func (s *SQLStats) MaybeDumpStatsToLog( + ctx context.Context, + appName string, + container *ssmemstorage.Container, + target Sink) (err error) { + // Save the existing data to logs. + // TODO(knz/dt): instead of dumping the stats to the log, save + // them in a SQL table so they can be inspected by the DBA and/or + // the UI. + if sqlstats.DumpStmtStatsToLogBeforeReset.Get(&s.st.SV) { + container.SaveToLog(ctx, appName) + } + + if target != nil { + lastErr := target.AddAppStats(ctx, appName, container) + // If we run out of memory budget, Container.Add() will merge stats in + // statsContainer with all the existing stats. However it will discard + // rest of the stats in statsContainer that requires memory allocation. + // We do not wish to short circuit here because we want to still try our + // best to merge all the stats that we can. + if lastErr != nil { + err = lastErr + } + } + return err +} + func (s *SQLStats) GetClusterSettings() *cluster.Settings { return s.st } diff --git a/pkg/sql/sqlstats/sslocal/sslocal_provider.go b/pkg/sql/sqlstats/sslocal/sslocal_provider.go index d9535c90f0e2..7975c2eef2dd 100644 --- a/pkg/sql/sqlstats/sslocal/sslocal_provider.go +++ b/pkg/sql/sqlstats/sslocal/sslocal_provider.go @@ -13,6 +13,7 @@ package sslocal import ( "context" "sort" + "sync" "time" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -140,6 +141,50 @@ func (s *SQLStats) IterateStatementStats( return nil } +// ConsumeStats leverages the process of retrieving stats from in-memory storage and then iterating over them +// calling stmtVisitor and txnVisitor on statement and transaction stats respectively. +func (s *SQLStats) ConsumeStats(ctx context.Context, stmtVisitor sqlstats.StatementVisitor, txnVisitor sqlstats.TransactionVisitor) { + apps := s.getAppNames(false) + for _, app := range apps { + app := app + container := s.GetApplicationStats(app, true).(*ssmemstorage.Container) + if err := s.MaybeDumpStatsToLog(ctx, app, container, s.flushTarget); err != nil { + log.Warningf(ctx, "failed to dump stats to log, %s", err.Error()) + } + stmtStats := container.PopAllStatementsStats() + txnStats := container.PopAllTransactionStats() + container.Free(ctx) + + // Iterate over collected stats that have been already cleared from in-memory stats and persist them + // the system statement|transaction_statistics tables. + // In-memory stats storage is not locked here and it is safe to call stmtVisitor or txnVisitor functions + // that might be time consuming operations. + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + for _, stat := range stmtStats { + stat := stat + if err := stmtVisitor(ctx, stat); err != nil { + log.Warningf(ctx, "failed to consume statement statistics, %s", err.Error()) + } + } + }() + + go func() { + defer wg.Done() + for _, stat := range txnStats { + stat := stat + if err := txnVisitor(ctx, stat); err != nil { + log.Warningf(ctx, "failed to consume transaction statistics, %s", err.Error()) + } + } + }() + wg.Wait() + } +} + // StmtStatsIterator returns an instance of sslocal.StmtStatsIterator for // the current SQLStats. func (s *SQLStats) StmtStatsIterator(options sqlstats.IteratorOptions) StmtStatsIterator { diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go index 8980a7d5d274..4092a684c879 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go @@ -594,6 +594,72 @@ func (s *Container) SaveToLog(ctx context.Context, appName string) { log.Infof(ctx, "statistics for %q:\n%s", appName, buf.String()) } +// PopAllStatementsStats returns all collected statement stats in memory to the caller and clears SQL stats +// make sure that new arriving stats won't be interfering with existing one. +func (s *Container) PopAllStatementsStats() []*appstatspb.CollectedStatementStatistics { + s.mu.Lock() + defer s.mu.Unlock() + stats := make([]*appstatspb.CollectedStatementStatistics, 0) + if len(s.mu.stmts) == 0 { + return stats + } + for key, stmt := range s.mu.stmts { + stmt.mu.Lock() + data := stmt.mu.data + distSQLUsed := stmt.mu.distSQLUsed + vectorized := stmt.mu.vectorized + fullScan := stmt.mu.fullScan + database := stmt.mu.database + querySummary := stmt.mu.querySummary + stmt.mu.Unlock() + + stats = append(stats, &appstatspb.CollectedStatementStatistics{ + Key: appstatspb.StatementStatisticsKey{ + Query: key.stmtNoConstants, + QuerySummary: querySummary, + DistSQL: distSQLUsed, + Vec: vectorized, + ImplicitTxn: key.implicitTxn, + FullScan: fullScan, + Failed: key.failed, + App: s.appName, + Database: database, + PlanHash: key.planHash, + TransactionFingerprintID: key.transactionFingerprintID, + }, + ID: constructStatementFingerprintIDFromStmtKey(key), + Stats: data, + }) + } + // Reset stats after they're collected. s.Reset is not called here to a) ensure that only statement + // stats are cleared b) it is cleared under the same lock so there's no chance that new stats are added. + s.mu.stmts = make(map[stmtKey]*stmtStats, len(s.mu.stmts)/2) + s.mu.sampledPlanMetadataCache = make(map[sampledPlanKey]time.Time, len(s.mu.sampledPlanMetadataCache)/2) + return stats +} + +// PopAllTransactionStats returns all collected transaction stats in memory to the caller and clears SQL stats +// make sure that new arriving stats won't be interfering with existing one. +func (t *Container) PopAllTransactionStats() []*appstatspb.CollectedTransactionStatistics { + t.mu.Lock() + defer t.mu.Unlock() + stats := make([]*appstatspb.CollectedTransactionStatistics, 0) + if len(t.mu.txns) == 0 { + return stats + } + for key := range t.mu.txns { + txnStats, _, _ := t.getStatsForTxnWithKeyLocked(key, nil /* stmtFingerprintIDs */, false /* createIfNonexistent */) + stats = append(stats, &appstatspb.CollectedTransactionStatistics{ + StatementFingerprintIDs: txnStats.statementFingerprintIDs, + App: t.appName, + Stats: txnStats.mu.data, + TransactionFingerprintID: key, + }) + } + t.mu.txns = make(map[appstatspb.TransactionFingerprintID]*txnStats, len(t.mu.txns)/2) + return stats +} + // Clear clears the data stored in this Container and prepare the Container // for reuse. func (s *Container) Clear(ctx context.Context) {