diff --git a/go/cmd/dolt/commands/engine/sqlengine.go b/go/cmd/dolt/commands/engine/sqlengine.go index 03112e05fe..9c43cab2b9 100644 --- a/go/cmd/dolt/commands/engine/sqlengine.go +++ b/go/cmd/dolt/commands/engine/sqlengine.go @@ -41,6 +41,7 @@ import ( dsqle "github.com/dolthub/dolt/go/libraries/doltcore/sqle" dblr "github.com/dolthub/dolt/go/libraries/doltcore/sqle/binlogreplication" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/cluster" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dprocedures" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/kvexec" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/mysql_file_handler" @@ -81,6 +82,7 @@ type SqlEngineConfig struct { JwksConfig []servercfg.JwksConfig SystemVariables SystemVariables ClusterController *cluster.Controller + AutoGCController *dsqle.AutoGCController BinlogReplicaController binlogreplication.BinlogReplicaController EventSchedulerStatus eventscheduler.SchedulerStatus } @@ -115,7 +117,15 @@ func NewSqlEngine( return nil, err } - all := dbs[:] + // Make a copy of the databases. |all| is going to be provided + // as the set of all initial databases to dsqle + // DatabaseProvider. |dbs| is only the databases that came + // from MultiRepoEnv, and they are all real databases based on + // DoltDB instances. |all| is going to include some extension, + // informational databases like |dolt_cluster| sometimes, + // depending on config. + all := make([]dsess.SqlDatabase, len(dbs)) + copy(all, dbs) // this is overwritten only for server sessions for _, db := range dbs { @@ -194,6 +204,18 @@ func NewSqlEngine( statsPro := statspro.NewProvider(pro, statsnoms.NewNomsStatsFactory(mrEnv.RemoteDialProvider())) engine.Analyzer.Catalog.StatsProvider = statsPro + if config.AutoGCController != nil { + err = config.AutoGCController.RunBackgroundThread(bThreads, sqlEngine.NewDefaultContext) + if err != nil { + return nil, err + } + config.AutoGCController.ApplyCommitHooks(ctx, mrEnv, dbs...) + pro.InitDatabaseHooks = append(pro.InitDatabaseHooks, config.AutoGCController.InitDatabaseHook()) + pro.DropDatabaseHooks = append(pro.DropDatabaseHooks, config.AutoGCController.DropDatabaseHook()) + // XXX: We force session aware safepoint controller if auto_gc is on. + dprocedures.UseSessionAwareSafepointController = true + } + engine.Analyzer.ExecBuilder = rowexec.NewOverrideBuilder(kvexec.Builder{}) sessFactory := doltSessionFactory(pro, statsPro, mrEnv.Config(), bcController, gcSafepointController, config.Autocommit) sqlEngine.provider = pro diff --git a/go/cmd/dolt/commands/sqlserver/command_line_config.go b/go/cmd/dolt/commands/sqlserver/command_line_config.go index 0d9bf9afdd..5d0d8cde29 100755 --- a/go/cmd/dolt/commands/sqlserver/command_line_config.go +++ b/go/cmd/dolt/commands/sqlserver/command_line_config.go @@ -489,6 +489,10 @@ func (cfg *commandLineServerConfig) ValueSet(value string) bool { return ok } +func (cfg *commandLineServerConfig) AutoGCBehavior() servercfg.AutoGCBehavior { + return stubAutoGCBehavior{} +} + // DoltServerConfigReader is the default implementation of ServerConfigReader suitable for parsing Dolt config files // and command line options. type DoltServerConfigReader struct{} @@ -510,3 +514,10 @@ func (d DoltServerConfigReader) ReadConfigFile(cwdFS filesys.Filesys, file strin func (d DoltServerConfigReader) ReadConfigArgs(args *argparser.ArgParseResults, dataDirOverride string) (servercfg.ServerConfig, error) { return NewCommandLineConfig(nil, args, dataDirOverride) } + +type stubAutoGCBehavior struct { +} + +func (stubAutoGCBehavior) Enable() bool { + return false +} diff --git a/go/cmd/dolt/commands/sqlserver/server.go b/go/cmd/dolt/commands/sqlserver/server.go index 33d253a377..dd6dd83784 100644 --- a/go/cmd/dolt/commands/sqlserver/server.go +++ b/go/cmd/dolt/commands/sqlserver/server.go @@ -257,6 +257,17 @@ func ConfigureServices( } controller.Register(InitEventSchedulerStatus) + InitAutoGCController := &svcs.AnonService{ + InitF: func(context.Context) error { + if serverConfig.AutoGCBehavior() != nil && + serverConfig.AutoGCBehavior().Enable() { + config.AutoGCController = sqle.NewAutoGCController(lgr) + } + return nil + }, + } + controller.Register(InitAutoGCController) + var sqlEngine *engine.SqlEngine InitSqlEngine := &svcs.AnonService{ InitF: func(ctx context.Context) (err error) { diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index c995185fc3..d9e8597b9b 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1917,23 +1917,81 @@ func (ddb *DoltDB) IsTableFileStore() bool { // ChunkJournal returns the ChunkJournal for this DoltDB, if one is in use. func (ddb *DoltDB) ChunkJournal() *nbs.ChunkJournal { - tableFileStore, ok := datas.ChunkStoreFromDatabase(ddb.db).(chunks.TableFileStore) - if !ok { - return nil - } + cs := datas.ChunkStoreFromDatabase(ddb.db) - generationalNbs, ok := tableFileStore.(*nbs.GenerationalNBS) - if !ok { - return nil + if generationalNBS, ok := cs.(*nbs.GenerationalNBS); ok { + cs = generationalNBS.NewGen() } - newGen := generationalNbs.NewGen() - nbs, ok := newGen.(*nbs.NomsBlockStore) - if !ok { + if nbsStore, ok := cs.(*nbs.NomsBlockStore); ok { + return nbsStore.ChunkJournal() + } else { return nil } +} - return nbs.ChunkJournal() +// An approximate representation of how large the on-disk storage is for a DoltDB. +type StoreSizes struct { + // For ChunkJournal stores, this will be size of the journal file. A size + // of zero does not mean the store is not journaled. The store could be + // journaled, and the journal could be empty. + JournalBytes uint64 + // For Generational storages this will be the size of the new gen. It will + // include any JournalBytes. A size of zero does not mean the store is not + // generational, since it could be the case that the store is generational + // but everything in it is in the old gen. In practice, given how we build + // oldgen references today, this will never be the case--there is always + // a little bit of data that only goes in the newgen. + NewGenBytes uint64 + // This is the approximate total on-disk storage overhead of the store. + // It includes Journal and NewGenBytes, if there are any. + TotalBytes uint64 +} + +func (ddb *DoltDB) StoreSizes(ctx context.Context) (StoreSizes, error) { + cs := datas.ChunkStoreFromDatabase(ddb.db) + if generationalNBS, ok := cs.(*nbs.GenerationalNBS); ok { + newgen := generationalNBS.NewGen() + newGenTFS, newGenTFSOk := newgen.(chunks.TableFileStore) + totalTFS, totalTFSOk := cs.(chunks.TableFileStore) + newGenNBS, newGenNBSOk := newgen.(*nbs.NomsBlockStore) + if !(newGenTFSOk && totalTFSOk && newGenNBSOk) { + return StoreSizes{}, fmt.Errorf("unexpected newgen or chunk store type for *nbs.GenerationalNBS instance; cannot take store sizes: cs: %T, newgen: %T", cs, newgen) + } + newgenSz, err := newGenTFS.Size(ctx) + if err != nil { + return StoreSizes{}, err + } + totalSz, err := totalTFS.Size(ctx) + if err != nil { + return StoreSizes{}, err + } + journal := newGenNBS.ChunkJournal() + if journal != nil { + return StoreSizes{ + JournalBytes: uint64(journal.Size()), + NewGenBytes: newgenSz, + TotalBytes: totalSz, + }, nil + } else { + return StoreSizes{ + NewGenBytes: newgenSz, + TotalBytes: totalSz, + }, nil + } + } else { + totalTFS, totalTFSOk := cs.(chunks.TableFileStore) + if !totalTFSOk { + return StoreSizes{}, fmt.Errorf("unexpected chunk store type for non-*nbs.GenerationalNBS ddb.db instance; cannot take store sizes: cs: %T", cs) + } + totalSz, err := totalTFS.Size(ctx) + if err != nil { + return StoreSizes{}, err + } + return StoreSizes{ + TotalBytes: totalSz, + }, nil + } } func (ddb *DoltDB) TableFileStoreHasJournal(ctx context.Context) (bool, error) { diff --git a/go/libraries/doltcore/remotesrv/grpc.go b/go/libraries/doltcore/remotesrv/grpc.go index b3090e587c..40ae33fe60 100644 --- a/go/libraries/doltcore/remotesrv/grpc.go +++ b/go/libraries/doltcore/remotesrv/grpc.go @@ -36,6 +36,7 @@ import ( "github.com/dolthub/dolt/go/libraries/utils/filesys" "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/hash" + "github.com/dolthub/dolt/go/store/nbs" "github.com/dolthub/dolt/go/store/types" ) @@ -97,7 +98,7 @@ func (rs *RemoteChunkStore) HasChunks(ctx context.Context, req *remotesapi.HasCh } repoPath := getRepoPath(req) logger = logger.WithField(RepoPathField, repoPath) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() cs, err := rs.getStore(ctx, logger, repoPath) if err != nil { @@ -155,7 +156,7 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot } repoPath := getRepoPath(req) logger = logger.WithField(RepoPathField, repoPath) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() cs, err := rs.getStore(ctx, logger, repoPath) if err != nil { @@ -233,7 +234,7 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore "num_requested": numHashes, "num_urls": numUrls, "num_ranges": numRanges, - }).Info("finished") + }).Trace("finished") }() logger := ologger @@ -387,7 +388,7 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes } repoPath := getRepoPath(req) logger = logger.WithField(RepoPathField, repoPath) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() _, err := rs.getStore(ctx, logger, repoPath) if err != nil { @@ -445,7 +446,7 @@ func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRe } repoPath := getRepoPath(req) logger = logger.WithField(RepoPathField, repoPath) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() _, err := rs.getStore(ctx, logger, repoPath) if err != nil { @@ -462,7 +463,7 @@ func (rs *RemoteChunkStore) Root(ctx context.Context, req *remotesapi.RootReques } repoPath := getRepoPath(req) logger = logger.WithField(RepoPathField, repoPath) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() cs, err := rs.getStore(ctx, logger, repoPath) if err != nil { @@ -485,7 +486,7 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe } repoPath := getRepoPath(req) logger = logger.WithField(RepoPathField, repoPath) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() cs, err := rs.getStore(ctx, logger, repoPath) if err != nil { @@ -500,7 +501,11 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe err = cs.AddTableFilesToManifest(ctx, updates, rs.getAddrs(cs.Version())) if err != nil { logger.WithError(err).Error("error calling AddTableFilesToManifest") - return nil, status.Errorf(codes.Internal, "manifest update error: %v", err) + code := codes.Internal + if errors.Is(err, nbs.ErrDanglingRef) || errors.Is(err, nbs.ErrTableFileNotFound) { + code = codes.FailedPrecondition + } + return nil, status.Errorf(code, "manifest update error: %v", err) } currHash := hash.New(req.Current) @@ -513,7 +518,11 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe "last_hash": lastHash.String(), "curr_hash": currHash.String(), }).Error("error calling Commit") - return nil, status.Errorf(codes.Internal, "failed to commit: %v", err) + code := codes.Internal + if errors.Is(err, nbs.ErrDanglingRef) || errors.Is(err, nbs.ErrTableFileNotFound) { + code = codes.FailedPrecondition + } + return nil, status.Errorf(code, "failed to commit: %v", err) } logger.Tracef("Commit success; moved from %s -> %s", lastHash.String(), currHash.String()) @@ -528,7 +537,7 @@ func (rs *RemoteChunkStore) GetRepoMetadata(ctx context.Context, req *remotesapi repoPath := getRepoPath(req) logger = logger.WithField(RepoPathField, repoPath) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() cs, err := rs.getOrCreateStore(ctx, logger, repoPath, req.ClientRepoFormat.NbfVersion) if err != nil { @@ -556,7 +565,7 @@ func (rs *RemoteChunkStore) ListTableFiles(ctx context.Context, req *remotesapi. } repoPath := getRepoPath(req) logger = logger.WithField(RepoPathField, repoPath) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() cs, err := rs.getStore(ctx, logger, repoPath) if err != nil { @@ -634,7 +643,7 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A } repoPath := getRepoPath(req) logger = logger.WithField(RepoPathField, repoPath) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() cs, err := rs.getStore(ctx, logger, repoPath) if err != nil { @@ -649,7 +658,11 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A err = cs.AddTableFilesToManifest(ctx, updates, rs.getAddrs(cs.Version())) if err != nil { logger.WithError(err).Error("error occurred updating the manifest") - return nil, status.Error(codes.Internal, "manifest update error") + code := codes.Internal + if errors.Is(err, nbs.ErrDanglingRef) || errors.Is(err, nbs.ErrTableFileNotFound) { + code = codes.FailedPrecondition + } + return nil, status.Error(code, "manifest update error") } logger = logger.WithFields(logrus.Fields{ @@ -707,7 +720,7 @@ func getReqLogger(lgr *logrus.Entry, method string) *logrus.Entry { "method": method, "request_num": strconv.Itoa(incReqId()), }) - lgr.Info("starting request") + lgr.Trace("starting request") return lgr } diff --git a/go/libraries/doltcore/remotesrv/http.go b/go/libraries/doltcore/remotesrv/http.go index 825f4ca0df..ba1ea046ee 100644 --- a/go/libraries/doltcore/remotesrv/http.go +++ b/go/libraries/doltcore/remotesrv/http.go @@ -66,7 +66,7 @@ func newFileHandler(lgr *logrus.Entry, dbCache DBCache, fs filesys.Filesys, read func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) { logger := getReqLogger(fh.lgr, req.Method+"_"+req.RequestURI) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() var err error req.URL, err = fh.sealer.Unseal(req.URL) diff --git a/go/libraries/doltcore/servercfg/serverconfig.go b/go/libraries/doltcore/servercfg/serverconfig.go index e69ae23bab..79aa01c958 100644 --- a/go/libraries/doltcore/servercfg/serverconfig.go +++ b/go/libraries/doltcore/servercfg/serverconfig.go @@ -48,6 +48,7 @@ const ( DefaultReadOnly = false DefaultLogLevel = LogLevel_Info DefaultAutoCommit = true + DefaultAutoGCBehaviorEnable = false DefaultDoltTransactionCommit = false DefaultMaxConnections = 100 DefaultDataDir = "." @@ -198,6 +199,8 @@ type ServerConfig interface { EventSchedulerStatus() string // ValueSet returns whether the value string provided was explicitly set in the config ValueSet(value string) bool + // AutoGCBehavior defines parameters around how auto-GC works for the running server. + AutoGCBehavior() AutoGCBehavior } // DefaultServerConfig creates a `*ServerConfig` that has all of the options set to their default values. @@ -214,6 +217,9 @@ func defaultServerConfigYAML() *YAMLConfig { ReadOnly: ptr(DefaultReadOnly), AutoCommit: ptr(DefaultAutoCommit), DoltTransactionCommit: ptr(DefaultDoltTransactionCommit), + AutoGCBehavior: &AutoGCBehaviorYAMLConfig{ + Enable_: ptr(DefaultAutoGCBehaviorEnable), + }, }, UserConfig: UserYAMLConfig{ Name: ptr(""), @@ -445,3 +451,7 @@ func CheckForUnixSocket(config ServerConfig) (string, bool, error) { return "", false, nil } + +type AutoGCBehavior interface { + Enable() bool +} diff --git a/go/libraries/doltcore/servercfg/yaml_config.go b/go/libraries/doltcore/servercfg/yaml_config.go index c5fc56b977..5f370f3350 100644 --- a/go/libraries/doltcore/servercfg/yaml_config.go +++ b/go/libraries/doltcore/servercfg/yaml_config.go @@ -65,6 +65,8 @@ type BehaviorYAMLConfig struct { DoltTransactionCommit *bool `yaml:"dolt_transaction_commit,omitempty"` EventSchedulerStatus *string `yaml:"event_scheduler,omitempty" minver:"1.17.0"` + + AutoGCBehavior *AutoGCBehaviorYAMLConfig `yaml:"auto_gc_behavior,omitempty" minver:"TBD"` } // UserYAMLConfig contains server configuration regarding the user account clients must use to connect @@ -176,6 +178,7 @@ func YamlConfigFromFile(fs filesys.Filesys, path string) (ServerConfig, error) { func ServerConfigAsYAMLConfig(cfg ServerConfig) *YAMLConfig { systemVars := cfg.SystemVars() + autoGCBehavior := toAutoGCBehaviorYAML(cfg.AutoGCBehavior()) return &YAMLConfig{ LogLevelStr: ptr(string(cfg.LogLevel())), MaxQueryLenInLogs: nillableIntPtr(cfg.MaxLoggedQueryLen()), @@ -186,6 +189,7 @@ func ServerConfigAsYAMLConfig(cfg ServerConfig) *YAMLConfig { DisableClientMultiStatements: ptr(cfg.DisableClientMultiStatements()), DoltTransactionCommit: ptr(cfg.DoltTransactionCommit()), EventSchedulerStatus: ptr(cfg.EventSchedulerStatus()), + AutoGCBehavior: autoGCBehavior, }, ListenerConfig: ListenerYAMLConfig{ HostStr: ptr(cfg.Host()), @@ -817,6 +821,13 @@ func (cfg YAMLConfig) ClusterConfig() ClusterConfig { return cfg.ClusterCfg } +func (cfg YAMLConfig) AutoGCBehavior() AutoGCBehavior { + if cfg.BehaviorConfig.AutoGCBehavior == nil { + return nil + } + return cfg.BehaviorConfig.AutoGCBehavior +} + func (cfg YAMLConfig) EventSchedulerStatus() string { if cfg.BehaviorConfig.EventSchedulerStatus == nil { return "ON" @@ -922,3 +933,20 @@ func (cfg YAMLConfig) ValueSet(value string) bool { } return false } + +type AutoGCBehaviorYAMLConfig struct { + Enable_ *bool `yaml:"enable,omitempty" minver:"TBD"` +} + +func (a *AutoGCBehaviorYAMLConfig) Enable() bool { + if a.Enable_ == nil { + return false + } + return *a.Enable_ +} + +func toAutoGCBehaviorYAML(a AutoGCBehavior) *AutoGCBehaviorYAMLConfig { + return &AutoGCBehaviorYAMLConfig{ + Enable_: ptr(a.Enable()), + } +} diff --git a/go/libraries/doltcore/servercfg/yaml_config_test.go b/go/libraries/doltcore/servercfg/yaml_config_test.go index 76dd3f21f3..5f6dd64c62 100644 --- a/go/libraries/doltcore/servercfg/yaml_config_test.go +++ b/go/libraries/doltcore/servercfg/yaml_config_test.go @@ -34,6 +34,8 @@ behavior: dolt_transaction_commit: true disable_client_multi_statements: false event_scheduler: ON + auto_gc_behavior: + enable: false listener: host: localhost diff --git a/go/libraries/doltcore/sqle/auto_gc.go b/go/libraries/doltcore/sqle/auto_gc.go new file mode 100644 index 0000000000..644a13cfe5 --- /dev/null +++ b/go/libraries/doltcore/sqle/auto_gc.go @@ -0,0 +1,348 @@ +// Copyright 2025 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqle + +import ( + "context" + "errors" + "io" + "sync" + "time" + + "github.com/dolthub/go-mysql-server/sql" + "github.com/sirupsen/logrus" + + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" + "github.com/dolthub/dolt/go/libraries/doltcore/env" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dprocedures" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" + "github.com/dolthub/dolt/go/store/chunks" + "github.com/dolthub/dolt/go/store/datas" + "github.com/dolthub/dolt/go/store/types" +) + +// Auto GC is the ability of a running SQL server engine to perform +// dolt_gc() behaviors periodically. If enabled, it currently works as +// follows: +// +// An AutoGCController is created for a running SQL Engine. The +// controller runs a background thread which is only ever running one +// GC at a time. Post Commit Hooks are installed on every database in +// the DoltDatabaseProvider for the SQL Engine. Those hooks check if +// it is time to perform a GC for that particular database. If it is, +// they forward a request the background thread to register the +// database as wanting a GC. + +type AutoGCController struct { + workCh chan autoGCWork + lgr *logrus.Logger + + mu sync.Mutex + hooks map[string]*autoGCCommitHook + ctxF func(context.Context) (*sql.Context, error) + threads *sql.BackgroundThreads +} + +func NewAutoGCController(lgr *logrus.Logger) *AutoGCController { + return &AutoGCController{ + workCh: make(chan autoGCWork), + lgr: lgr, + hooks: make(map[string]*autoGCCommitHook), + } +} + +// Passed by a commit hook to the auto-GC thread, requesting the +// thread to dolt_gc |db|. When the GC is finished, |done| will be +// closed. Signalling completion allows the commit hook to only +// submit one dolt_gc request at a time. +type autoGCWork struct { + db *doltdb.DoltDB + done chan struct{} + name string // only for logging. +} + +// During engine initialization, this should be called to ensure the +// background worker threads responsible for performing the GC are +// running. +func (c *AutoGCController) RunBackgroundThread(threads *sql.BackgroundThreads, ctxF func(context.Context) (*sql.Context, error)) error { + c.threads = threads + c.ctxF = ctxF + err := threads.Add("auto_gc_thread", c.gcBgThread) + if err != nil { + return err + } + for _, hook := range c.hooks { + err = hook.run(threads) + if err != nil { + return err + } + } + return nil +} + +func (c *AutoGCController) gcBgThread(ctx context.Context) { + var wg sync.WaitGroup + runCh := make(chan autoGCWork) + wg.Add(1) + go func() { + defer wg.Done() + dbs := make([]autoGCWork, 0) + // Accumulate GC requests, only one will come in per database at a time. + // Send the oldest one out to the worker when it is ready. + for { + var toSendCh chan autoGCWork + var toSend autoGCWork + if len(dbs) > 0 { + toSend = dbs[0] + toSendCh = runCh + } + select { + case <-ctx.Done(): + // sql.BackgroundThreads is shutting down. + // No need to drain or anything; just + // return. + return + case newDB := <-c.workCh: + dbs = append(dbs, newDB) + case toSendCh <- toSend: + // We just sent the front of the slice. + // Delete it from our set of pending GCs. + copy(dbs[:], dbs[1:]) + dbs = dbs[:len(dbs)-1] + } + + } + }() + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case work := <-runCh: + c.doWork(ctx, work, c.ctxF) + } + } + }() + wg.Wait() +} + +func (c *AutoGCController) doWork(ctx context.Context, work autoGCWork, ctxF func(context.Context) (*sql.Context, error)) { + defer close(work.done) + sqlCtx, err := ctxF(ctx) + if err != nil { + c.lgr.Warnf("sqle/auto_gc: Could not create session to GC %s: %v", work.name, err) + return + } + c.lgr.Tracef("sqle/auto_gc: Beginning auto GC of database %s", work.name) + start := time.Now() + defer sql.SessionEnd(sqlCtx.Session) + sql.SessionCommandBegin(sqlCtx.Session) + defer sql.SessionCommandEnd(sqlCtx.Session) + err = dprocedures.RunDoltGC(sqlCtx, work.db, types.GCModeDefault, work.name) + if err != nil { + if !errors.Is(err, chunks.ErrNothingToCollect) { + c.lgr.Warnf("sqle/auto_gc: Attempt to auto GC database %s failed with error: %v", work.name, err) + } + return + } + c.lgr.Infof("sqle/auto_gc: Successfully completed auto GC of database %s in %v", work.name, time.Since(start)) +} + +func (c *AutoGCController) newCommitHook(name string, db *doltdb.DoltDB) *autoGCCommitHook { + c.mu.Lock() + defer c.mu.Unlock() + closed := make(chan struct{}) + close(closed) + ret := &autoGCCommitHook{ + c: c, + name: name, + done: closed, + next: make(chan struct{}), + db: db, + tickCh: make(chan struct{}), + stopCh: make(chan struct{}), + } + c.hooks[name] = ret + if c.threads != nil { + // If this errors, sql.BackgroundThreads is already closed. + // Things are hopefully shutting down... + _ = ret.run(c.threads) + } + return ret +} + +// The doltdb.CommitHook which watches for database changes and +// requests dolt_gcs. +type autoGCCommitHook struct { + c *AutoGCController + name string + // When |done| is closed, there is no GC currently running or + // pending for this database. If it is open, then there is a + // pending request for GC or a GC is currently running. Once + // |done| is closed, we can check for auto GC conditions on + // the database to see if we should request a new GC. + done chan struct{} + // It simplifies the logic and efficiency of the + // implementation a bit to have an already allocated channel + // we can try to send when we request a GC. If will become our + // new |done| channel once we send it successfully. + next chan struct{} + // lastSz is set the first time we observe StoreSizes after a + // GC or after the server comes up. It is used in some simple + // growth heuristics to figure out if we want to run a GC. We + // set it back to |nil| when we successfully submit a request + // to GC, so that we observe and store the new size after the + // GC is finished. + lastSz *doltdb.StoreSizes + + db *doltdb.DoltDB + + // Closed when the thread should shutdown because the database + // is being removed. + stopCh chan struct{} + // An optimistic send on this channel notifies the background + // thread that the sizes may have changed and it can check for + // the GC condition. + tickCh chan struct{} + wg sync.WaitGroup +} + +// During engine initialization, called on the original set of +// databases to configure them for auto-GC. +func (c *AutoGCController) ApplyCommitHooks(ctx context.Context, mrEnv *env.MultiRepoEnv, dbs ...dsess.SqlDatabase) error { + for _, db := range dbs { + denv := mrEnv.GetEnv(db.Name()) + if denv == nil { + continue + } + ddb := denv.DoltDB(ctx) + ddb.PrependCommitHooks(ctx, c.newCommitHook(db.Name(), ddb)) + } + return nil +} + +func (c *AutoGCController) DropDatabaseHook() DropDatabaseHook { + return func(_ *sql.Context, name string) { + c.mu.Lock() + defer c.mu.Unlock() + hook := c.hooks[name] + if hook != nil { + hook.stop() + delete(c.hooks, name) + } + } +} + +func (c *AutoGCController) InitDatabaseHook() InitDatabaseHook { + return func(ctx *sql.Context, _ *DoltDatabaseProvider, name string, env *env.DoltEnv, _ dsess.SqlDatabase) error { + ddb := env.DoltDB(ctx) + ddb.PrependCommitHooks(ctx, c.newCommitHook(name, ddb)) + return nil + } +} + +func (h *autoGCCommitHook) Execute(ctx context.Context, _ datas.Dataset, _ *doltdb.DoltDB) (func(context.Context) error, error) { + select { + case h.tickCh <- struct{}{}: + return nil, nil + case <-ctx.Done(): + return nil, context.Cause(ctx) + } +} + +func (h *autoGCCommitHook) requestGC(ctx context.Context) error { + select { + case h.c.workCh <- autoGCWork{h.db, h.next, h.name}: + h.done = h.next + h.next = make(chan struct{}) + h.lastSz = nil + return nil + case <-ctx.Done(): + return context.Cause(ctx) + } +} + +func (h *autoGCCommitHook) HandleError(ctx context.Context, err error) error { + return nil +} + +func (h *autoGCCommitHook) SetLogger(ctx context.Context, wr io.Writer) error { + return nil +} + +func (h *autoGCCommitHook) ExecuteForWorkingSets() bool { + return true +} + +const checkInterval = 1 * time.Second +const size_128mb = (1 << 27) +const defaultCheckSizeThreshold = size_128mb + +func (h *autoGCCommitHook) checkForGC(ctx context.Context) error { + select { + case <-h.done: + sz, err := h.db.StoreSizes(ctx) + if err != nil { + // Something is probably quite wrong. Regardless, can't determine if we should GC. + return err + } + if h.lastSz == nil { + h.lastSz = &sz + } + if sz.JournalBytes > defaultCheckSizeThreshold { + // Our first heuristic is simply if journal is greater than a fixed size... + return h.requestGC(ctx) + } else if sz.TotalBytes > h.lastSz.TotalBytes && sz.TotalBytes-h.lastSz.TotalBytes > defaultCheckSizeThreshold { + // Or if the store has grown by a fixed size since our last GC / we started watching it... + return h.requestGC(ctx) + } + default: + // A GC is already running or pending. No need to check. + } + return nil +} + +func (h *autoGCCommitHook) thread(ctx context.Context) { + defer h.wg.Done() + timer := time.NewTimer(checkInterval) + defer timer.Stop() + for { + select { + case <-ctx.Done(): + return + case <-h.stopCh: + return + case <-h.tickCh: + // We ignore an error here, which just means we didn't kick + // off a GC when we might have wanted to. + _ = h.checkForGC(ctx) + case <-timer.C: + _ = h.checkForGC(ctx) + timer.Reset(checkInterval) + } + } +} + +func (h *autoGCCommitHook) stop() { + close(h.stopCh) + h.wg.Wait() +} + +func (h *autoGCCommitHook) run(threads *sql.BackgroundThreads) error { + h.wg.Add(1) + return threads.Add("auto_gc_thread["+h.name+"]", h.thread) +} diff --git a/go/libraries/doltcore/sqle/auto_gc_test.go b/go/libraries/doltcore/sqle/auto_gc_test.go new file mode 100644 index 0000000000..367178c9a9 --- /dev/null +++ b/go/libraries/doltcore/sqle/auto_gc_test.go @@ -0,0 +1,117 @@ +// Copyright 2025 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqle + +import ( + "bytes" + "context" + "sync" + "testing" + "time" + + "github.com/dolthub/go-mysql-server/sql" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + + "github.com/dolthub/dolt/go/store/datas" +) + +func TestAutoGCController(t *testing.T) { + NewLogger := func() *logrus.Logger { + res := logrus.New() + res.SetOutput(new(bytes.Buffer)) + return res + } + CtxFactory := func(ctx context.Context) (*sql.Context, error) { + return sql.NewContext(ctx, sql.WithSession(sql.NewBaseSession())), nil + } + t.Run("Hook", func(t *testing.T) { + t.Run("NeverStarted", func(t *testing.T) { + controller := NewAutoGCController(NewLogger()) + hook := controller.newCommitHook("some_database", nil) + hook.stop() + }) + t.Run("StartedBeforeNewHook", func(t *testing.T) { + controller := NewAutoGCController(NewLogger()) + bg := sql.NewBackgroundThreads() + defer bg.Shutdown() + err := controller.RunBackgroundThread(bg, CtxFactory) + require.NoError(t, err) + ctx := context.Background() + dEnv := CreateTestEnvWithName("some_database") + hook := controller.newCommitHook("some_database", dEnv.DoltDB(ctx)) + hook.Execute(ctx, datas.Dataset{}, nil) + hook.stop() + }) + t.Run("StartedAfterNewHook", func(t *testing.T) { + controller := NewAutoGCController(NewLogger()) + bg := sql.NewBackgroundThreads() + defer bg.Shutdown() + ctx := context.Background() + dEnv := CreateTestEnvWithName("some_database") + hook := controller.newCommitHook("some_database", dEnv.DoltDB(ctx)) + err := controller.RunBackgroundThread(bg, CtxFactory) + require.NoError(t, err) + hook.Execute(ctx, datas.Dataset{}, nil) + hook.stop() + }) + t.Run("ExecuteOnCanceledCtx", func(t *testing.T) { + controller := NewAutoGCController(NewLogger()) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + dEnv := CreateTestEnvWithName("some_database") + hook := controller.newCommitHook("some_database", dEnv.DoltDB(ctx)) + _, err := hook.Execute(ctx, datas.Dataset{}, nil) + require.ErrorIs(t, err, context.Canceled) + }) + }) + t.Run("gcBgThread", func(t *testing.T) { + controller := NewAutoGCController(NewLogger()) + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + controller.gcBgThread(ctx) + }() + time.Sleep(50 * time.Millisecond) + cancel() + wg.Wait() + }) + t.Run("DatabaseProviderHooks", func(t *testing.T) { + t.Run("Unstarted", func(t *testing.T) { + controller := NewAutoGCController(NewLogger()) + ctx, err := CtxFactory(context.Background()) + require.NoError(t, err) + dEnv := CreateTestEnvWithName("some_database") + err = controller.InitDatabaseHook()(ctx, nil, "some_database", dEnv, nil) + require.NoError(t, err) + controller.DropDatabaseHook()(nil, "some_database") + }) + t.Run("Started", func(t *testing.T) { + controller := NewAutoGCController(NewLogger()) + bg := sql.NewBackgroundThreads() + defer bg.Shutdown() + err := controller.RunBackgroundThread(bg, CtxFactory) + require.NoError(t, err) + ctx, err := CtxFactory(context.Background()) + require.NoError(t, err) + dEnv := CreateTestEnvWithName("some_database") + err = controller.InitDatabaseHook()(ctx, nil, "some_database", dEnv, nil) + require.NoError(t, err) + controller.DropDatabaseHook()(nil, "some_database") + }) + }) +} diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index 487923dbcb..c6b07c3aed 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -317,7 +317,7 @@ func (c *Controller) applyCommitHooks(ctx context.Context, name string, denv *en } } commitHook := newCommitHook(c.lgr, r.Name(), remote.Url, name, c.role, func(ctx context.Context) (*doltdb.DoltDB, error) { - return remote.GetRemoteDB(ctx, types.Format_Default, dialprovider) + return remote.GetRemoteDBWithoutCaching(ctx, types.Format_Default, dialprovider) }, denv.DoltDB(ctx), ttfdir) denv.DoltDB(ctx).PrependCommitHooks(ctx, commitHook) hooks = append(hooks, commitHook) diff --git a/go/libraries/doltcore/sqle/cluster/initdbhook.go b/go/libraries/doltcore/sqle/cluster/initdbhook.go index 1fbcd57872..bd5eafe5b3 100644 --- a/go/libraries/doltcore/sqle/cluster/initdbhook.go +++ b/go/libraries/doltcore/sqle/cluster/initdbhook.go @@ -64,7 +64,7 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads) sqle } remoteDBs = append(remoteDBs, func(ctx context.Context) (*doltdb.DoltDB, error) { - return er.GetRemoteDB(ctx, types.Format_Default, dialprovider) + return er.GetRemoteDBWithoutCaching(ctx, types.Format_Default, dialprovider) }) remoteUrls = append(remoteUrls, remoteUrl) } diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go index 7d08abc1f0..e41e2d28bb 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go @@ -38,15 +38,13 @@ const ( cmdSuccess = 0 ) -var useSessionAwareSafepointController bool - func init() { if os.Getenv(dconfig.EnvDisableGcProcedure) != "" { DoltGCFeatureFlag = false } if choice := os.Getenv(dconfig.EnvGCSafepointControllerChoice); choice != "" { if choice == "session_aware" { - useSessionAwareSafepointController = true + UseSessionAwareSafepointController = true } else if choice != "kill_connections" { panic("Invalid value for " + dconfig.EnvGCSafepointControllerChoice + ". must be session_aware or kill_connections") } @@ -54,6 +52,7 @@ func init() { } var DoltGCFeatureFlag = true +var UseSessionAwareSafepointController = false // doltGC is the stored procedure to run online garbage collection on a database. func doltGC(ctx *sql.Context, args ...string) (sql.RowIter, error) { @@ -158,28 +157,28 @@ func (sc killConnectionsSafepointController) CancelSafepoint() { } type sessionAwareSafepointController struct { - controller *dsess.GCSafepointController - callCtx *sql.Context - origEpoch int - doltDB *doltdb.DoltDB + controller *dsess.GCSafepointController + dbname string + callSession *dsess.DoltSession + origEpoch int + doltDB *doltdb.DoltDB waiter *dsess.GCSafepointWaiter keeper func(hash.Hash) bool } func (sc *sessionAwareSafepointController) visit(ctx context.Context, sess *dsess.DoltSession) error { - return sess.VisitGCRoots(ctx, sc.callCtx.GetCurrentDatabase(), sc.keeper) + return sess.VisitGCRoots(ctx, sc.dbname, sc.keeper) } func (sc *sessionAwareSafepointController) BeginGC(ctx context.Context, keeper func(hash.Hash) bool) error { sc.doltDB.PurgeCaches() sc.keeper = keeper - thisSess := dsess.DSessFromSess(sc.callCtx.Session) - err := sc.visit(ctx, thisSess) + err := sc.visit(ctx, sc.callSession) if err != nil { return err } - sc.waiter = sc.controller.Waiter(ctx, thisSess, sc.visit) + sc.waiter = sc.controller.Waiter(ctx, sc.callSession, sc.visit) return nil } @@ -188,7 +187,7 @@ func (sc *sessionAwareSafepointController) EstablishPreFinalizeSafepoint(ctx con } func (sc *sessionAwareSafepointController) EstablishPostFinalizeSafepoint(ctx context.Context) error { - return checkEpochSame(sc.origEpoch) + return nil } func (sc *sessionAwareSafepointController) CancelSafepoint() { @@ -232,50 +231,53 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) { return cmdFailure, err } } else { - // Currently, if this server is involved in cluster - // replication, a full GC is only safe to run on the primary. - // We assert that we are the primary here before we begin, and - // we assert again that we are the primary at the same epoch as - // we establish the safepoint. + var mode types.GCMode = types.GCModeDefault + if apr.Contains(cli.FullFlag) { + mode = types.GCModeFull + } + + err := RunDoltGC(ctx, ddb, mode, ctx.GetCurrentDatabase()) + if err != nil { + return cmdFailure, err + } + } + + return cmdSuccess, nil +} + +func RunDoltGC(ctx *sql.Context, ddb *doltdb.DoltDB, mode types.GCMode, dbname string) error { + var sc types.GCSafepointController + if UseSessionAwareSafepointController { + dSess := dsess.DSessFromSess(ctx.Session) + gcSafepointController := dSess.GCSafepointController() + sc = &sessionAwareSafepointController{ + callSession: dSess, + dbname: dbname, + controller: gcSafepointController, + doltDB: ddb, + } + } else { + // Legacy safepoint controller behavior was to not + // allow GC on a standby server. GC on a standby server + // with killConnections safepoints should be safe now, + // but we retain this legacy behavior for now. origepoch := -1 if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok { // TODO: magic constant... if role.(string) != "primary" { - return cmdFailure, fmt.Errorf("cannot run a full dolt_gc() while cluster replication is enabled and role is %s; must be the primary", role.(string)) + return fmt.Errorf("cannot run a full dolt_gc() while cluster replication is enabled and role is %s; must be the primary", role.(string)) } _, epoch, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleEpochVariable) if !ok { - return cmdFailure, fmt.Errorf("internal error: cannot run a full dolt_gc(); cluster replication is enabled but could not read %s", dsess.DoltClusterRoleEpochVariable) + return fmt.Errorf("internal error: cannot run a full dolt_gc(); cluster replication is enabled but could not read %s", dsess.DoltClusterRoleEpochVariable) } origepoch = epoch.(int) } - - var mode types.GCMode = types.GCModeDefault - if apr.Contains(cli.FullFlag) { - mode = types.GCModeFull - } - - var sc types.GCSafepointController - if useSessionAwareSafepointController { - gcSafepointController := dSess.GCSafepointController() - sc = &sessionAwareSafepointController{ - origEpoch: origepoch, - callCtx: ctx, - controller: gcSafepointController, - doltDB: ddb, - } - } else { - sc = killConnectionsSafepointController{ - origEpoch: origepoch, - callCtx: ctx, - doltDB: ddb, - } - } - err = ddb.GC(ctx, mode, sc) - if err != nil { - return cmdFailure, err + sc = killConnectionsSafepointController{ + origEpoch: origepoch, + callCtx: ctx, + doltDB: ddb, } } - - return cmdSuccess, nil + return ddb.GC(ctx, mode, sc) } diff --git a/go/store/nbs/file_table_reader.go b/go/store/nbs/file_table_reader.go index d808b81563..48e38ffdce 100644 --- a/go/store/nbs/file_table_reader.go +++ b/go/store/nbs/file_table_reader.go @@ -33,6 +33,8 @@ import ( "github.com/dolthub/dolt/go/store/hash" ) +var ErrTableFileNotFound = errors.New("table file not found") + type fileTableReader struct { tableReader h hash.Hash @@ -81,7 +83,7 @@ func newFileTableReader(ctx context.Context, dir string, h hash.Hash, chunkCount } else if afExists { return newArchiveChunkSource(ctx, dir, h, chunkCount, q) } - return nil, errors.New(fmt.Sprintf("table file %s/%s not found", dir, h.String())) + return nil, fmt.Errorf("error opening table file: %w: %s/%s", ErrTableFileNotFound, dir, h.String()) } func nomsFileTableReader(ctx context.Context, path string, h hash.Hash, chunkCount uint32, q MemoryQuotaProvider) (cs chunkSource, err error) { diff --git a/go/store/nbs/journal.go b/go/store/nbs/journal.go index 7bbb4fd6d3..7e9598a4b6 100644 --- a/go/store/nbs/journal.go +++ b/go/store/nbs/journal.go @@ -498,6 +498,14 @@ func (j *ChunkJournal) AccessMode() chunks.ExclusiveAccessMode { return chunks.ExclusiveAccessMode_Exclusive } +func (j *ChunkJournal) Size() int64 { + if j.wr != nil { + return j.wr.size() + } else { + return 0 + } +} + type journalConjoiner struct { child conjoinStrategy } diff --git a/go/store/nbs/journal_writer.go b/go/store/nbs/journal_writer.go index caea111b14..1aedf7f073 100644 --- a/go/store/nbs/journal_writer.go +++ b/go/store/nbs/journal_writer.go @@ -472,6 +472,12 @@ func (wr *journalWriter) commitRootHash(ctx context.Context, root hash.Hash) err return wr.commitRootHashUnlocked(ctx, root) } +func (wr *journalWriter) size() int64 { + wr.lock.Lock() + defer wr.lock.Unlock() + return wr.off +} + func (wr *journalWriter) commitRootHashUnlocked(ctx context.Context, root hash.Hash) error { defer trace.StartRegion(ctx, "commit-root").End() diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 78ab22843b..1e5da5a4c1 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -955,10 +955,10 @@ func (nbs *NomsBlockStore) getManyWithFunc( nbs.stats.ChunksPerGet.Sample(uint64(len(hashes))) }() + reqs := toGetRecords(hashes) + const ioParallelism = 16 for { - reqs := toGetRecords(hashes) - nbs.mu.Lock() keeper := nbs.keeperFunc if gcDepMode == gcDependencyMode_NoDependency { @@ -1735,7 +1735,7 @@ func refCheckAllSources(ctx context.Context, nbs *NomsBlockStore, getAddrs chunk checkErr = err } if len(remaining) > 0 { - checkErr = fmt.Errorf("%w, missing: %v", errors.New("cannot add table files; referenced chunk not found in store."), remaining) + checkErr = fmt.Errorf("cannot add table files: %w, missing: %v", ErrTableFileNotFound, remaining) } } for _, source := range sources { diff --git a/integration-tests/go-sql-server-driver/auto_gc_test.go b/integration-tests/go-sql-server-driver/auto_gc_test.go new file mode 100644 index 0000000000..de14ea2478 --- /dev/null +++ b/integration-tests/go-sql-server-driver/auto_gc_test.go @@ -0,0 +1,337 @@ +// Copyright 2025 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "database/sql" + "fmt" + "math/rand/v2" + "os" + "path/filepath" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + driver "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/sql_server_driver" +) + +func TestAutoGC(t *testing.T) { + var enabled_16, final_16, disabled, final_disabled RepoSize + t.Run("Enable", func(t *testing.T) { + t.Run("CommitEvery16", func(t *testing.T) { + var s AutoGCTest + s.Enable = true + enabled_16, final_16 = runAutoGCTest(t, &s, 64, 16) + assert.Contains(t, string(s.PrimaryServer.Output.Bytes()), "Successfully completed auto GC") + t.Logf("repo size before final gc: %v", enabled_16) + t.Logf("repo size after final gc: %v", final_16) + }) + t.Run("ClusterReplication", func(t *testing.T) { + var s AutoGCTest + s.Enable = true + s.Replicate = true + enabled_16, final_16 = runAutoGCTest(t, &s, 64, 16) + assert.Contains(t, string(s.PrimaryServer.Output.Bytes()), "Successfully completed auto GC") + assert.Contains(t, string(s.StandbyServer.Output.Bytes()), "Successfully completed auto GC") + t.Logf("repo size before final gc: %v", enabled_16) + t.Logf("repo size after final gc: %v", final_16) + rs, err := GetRepoSize(s.StandbyDir) + require.NoError(t, err) + t.Logf("standby size: %v", rs) + }) + }) + t.Run("Disabled", func(t *testing.T) { + var s AutoGCTest + disabled, final_disabled = runAutoGCTest(t, &s, 64, 128) + assert.NotContains(t, string(s.PrimaryServer.Output.Bytes()), "Successfully completed auto GC") + t.Logf("repo size before final gc: %v", disabled) + t.Logf("repo size after final gc: %v", final_disabled) + }) + if enabled_16.NewGen > 0 && disabled.NewGen > 0 { + assert.Greater(t, enabled_16.OldGen, disabled.OldGen) + assert.Greater(t, enabled_16.OldGenC, disabled.OldGenC) + assert.Less(t, disabled.NewGen-disabled.Journal, disabled.Journal) + } +} + +type AutoGCTest struct { + Enable bool + PrimaryDir string + PrimaryServer *driver.SqlServer + PrimaryDB *sql.DB + + Replicate bool + StandbyDir string + StandbyServer *driver.SqlServer + StandbyDB *sql.DB +} + +func (s *AutoGCTest) Setup(ctx context.Context, t *testing.T) { + u, err := driver.NewDoltUser() + require.NoError(t, err) + t.Cleanup(func() { + u.Cleanup() + }) + + s.CreatePrimaryServer(ctx, t, u) + + if s.Replicate { + u, err := driver.NewDoltUser() + require.NoError(t, err) + t.Cleanup(func() { + u.Cleanup() + }) + s.CreateStandbyServer(ctx, t, u) + } + + s.CreatePrimaryDatabase(ctx, t) +} + +func (s *AutoGCTest) CreatePrimaryServer(ctx context.Context, t *testing.T, u driver.DoltUser) { + rs, err := u.MakeRepoStore() + require.NoError(t, err) + + repo, err := rs.MakeRepo("auto_gc_test") + require.NoError(t, err) + + behaviorFragment := fmt.Sprintf(` +behavior: + auto_gc_behavior: + enable: %v +`, s.Enable) + + var clusterFragment string + if s.Replicate { + clusterFragment = ` +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3852/{database} + bootstrap_role: primary + bootstrap_epoch: 1 + remotesapi: + port: 3851 +` + } + + err = driver.WithFile{ + Name: "server.yaml", + Contents: behaviorFragment + clusterFragment, + }.WriteAtDir(repo.Dir) + require.NoError(t, err) + + server := MakeServer(t, repo, &driver.Server{ + Args: []string{"--config", "server.yaml"}, + }) + server.DBName = "auto_gc_test" + + db, err := server.DB(driver.Connection{User: "root"}) + require.NoError(t, err) + t.Cleanup(func() { + db.Close() + }) + + s.PrimaryDir = repo.Dir + s.PrimaryDB = db + s.PrimaryServer = server +} + +func (s *AutoGCTest) CreateStandbyServer(ctx context.Context, t *testing.T, u driver.DoltUser) { + rs, err := u.MakeRepoStore() + require.NoError(t, err) + + repo, err := rs.MakeRepo("auto_gc_test") + require.NoError(t, err) + + behaviorFragment := fmt.Sprintf(` +listener: + host: 0.0.0.0 + port: 3308 +behavior: + auto_gc_behavior: + enable: %v +`, s.Enable) + + var clusterFragment string + if s.Replicate { + clusterFragment = ` +cluster: + standby_remotes: + - name: primary + remote_url_template: http://localhost:3851/{database} + bootstrap_role: standby + bootstrap_epoch: 1 + remotesapi: + port: 3852 +` + } + + err = driver.WithFile{ + Name: "server.yaml", + Contents: behaviorFragment + clusterFragment, + }.WriteAtDir(repo.Dir) + require.NoError(t, err) + + server := MakeServer(t, repo, &driver.Server{ + Args: []string{"--config", "server.yaml"}, + Port: 3308, + }) + server.DBName = "auto_gc_test" + + db, err := server.DB(driver.Connection{User: "root"}) + require.NoError(t, err) + t.Cleanup(func() { + db.Close() + }) + + s.StandbyDir = repo.Dir + s.StandbyDB = db + s.StandbyServer = server +} + +func (s *AutoGCTest) CreatePrimaryDatabase(ctx context.Context, t *testing.T) { + // Create the database... + conn, err := s.PrimaryDB.Conn(ctx) + require.NoError(t, err) + _, err = conn.ExecContext(ctx, ` +create table vals ( + id bigint primary key, + v1 bigint, + v2 bigint, + v3 bigint, + v4 bigint, + index (v1), + index (v2), + index (v3), + index (v4), + index (v1,v2), + index (v1,v3), + index (v1,v4), + index (v2,v3), + index (v2,v4), + index (v2,v1), + index (v3,v1), + index (v3,v2), + index (v3,v4), + index (v4,v1), + index (v4,v2), + index (v4,v3) +) +`) + require.NoError(t, err) + _, err = conn.ExecContext(ctx, "call dolt_commit('-Am', 'create vals table')") + require.NoError(t, err) + require.NoError(t, conn.Close()) +} + +func autoGCInsertStatement(i int) string { + var vals []string + for j := i * 1024; j < (i+1)*1024; j++ { + var vs [4]string + vs[0] = strconv.Itoa(rand.Int()) + vs[1] = strconv.Itoa(rand.Int()) + vs[2] = strconv.Itoa(rand.Int()) + vs[3] = strconv.Itoa(rand.Int()) + val := "(" + strconv.Itoa(j) + "," + strings.Join(vs[:], ",") + ")" + vals = append(vals, val) + } + return "insert into vals values " + strings.Join(vals, ",") +} + +func runAutoGCTest(t *testing.T, s *AutoGCTest, numStatements int, commitEvery int) (RepoSize, RepoSize) { + // A simple auto-GC test, where we run + // operations on an auto GC server and + // ensure that the database is getting + // collected. + ctx := context.Background() + s.Setup(ctx, t) + + for i := 0; i < numStatements; i++ { + stmt := autoGCInsertStatement(i) + conn, err := s.PrimaryDB.Conn(ctx) + _, err = conn.ExecContext(ctx, stmt) + require.NoError(t, err) + if i%commitEvery == 0 { + _, err = conn.ExecContext(ctx, "call dolt_commit('-am', 'insert from "+strconv.Itoa(i*1024)+"')") + require.NoError(t, err) + } + require.NoError(t, conn.Close()) + } + + before, err := GetRepoSize(s.PrimaryDir) + require.NoError(t, err) + conn, err := s.PrimaryDB.Conn(ctx) + require.NoError(t, err) + _, err = conn.ExecContext(ctx, "call dolt_gc('--full')") + require.NoError(t, err) + require.NoError(t, conn.Close()) + after, err := GetRepoSize(s.PrimaryDir) + require.NoError(t, err) + return before, after +} + +type RepoSize struct { + Journal int64 + NewGen int64 + NewGenC int + OldGen int64 + OldGenC int +} + +func GetRepoSize(dir string) (RepoSize, error) { + var ret RepoSize + entries, err := os.ReadDir(filepath.Join(dir, ".dolt/noms")) + if err != nil { + return ret, err + } + for _, e := range entries { + stat, err := e.Info() + if err != nil { + return ret, err + } + if stat.IsDir() { + continue + } + ret.NewGen += stat.Size() + ret.NewGenC += 1 + if e.Name() == "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv" { + ret.Journal += stat.Size() + } + } + entries, err = os.ReadDir(filepath.Join(dir, ".dolt/noms/oldgen")) + if err != nil { + return ret, err + } + for _, e := range entries { + stat, err := e.Info() + if err != nil { + return ret, err + } + if stat.IsDir() { + continue + } + ret.OldGen += stat.Size() + ret.OldGenC += 1 + } + return ret, nil +} + +func (rs RepoSize) String() string { + return fmt.Sprintf("journal: %v, new gen: %v (%v files), old gen: %v (%v files)", rs.Journal, rs.NewGen, rs.NewGenC, rs.OldGen, rs.OldGenC) +}