From dc4b94d68dc6b930957154b0c52b66efaa3b575d Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Mon, 10 Feb 2025 11:06:17 -0800 Subject: [PATCH] sql-server: Add behavior: auto_gc_behavior: enable. When Auto GC is enabled, the running sql-server will periodically collect a Dolt database that is growing in size. This behavior is currently experimental. Tuning the behavior around how often to collect is ongoing work. --- go/cmd/dolt/commands/engine/sqlengine.go | 16 +- .../commands/sqlserver/command_line_config.go | 11 + go/cmd/dolt/commands/sqlserver/server.go | 10 + go/libraries/doltcore/doltdb/doltdb.go | 22 +- .../doltcore/servercfg/serverconfig.go | 10 + .../doltcore/servercfg/yaml_config.go | 28 +++ .../doltcore/servercfg/yaml_config_test.go | 2 + go/libraries/doltcore/sqle/auto_gc.go | 225 ++++++++++++++++++ .../doltcore/sqle/dprocedures/dolt_gc.go | 75 +++--- go/store/nbs/journal.go | 4 + go/store/nbs/journal_writer.go | 6 + .../go-sql-server-driver/auto_gc_test.go | 220 +++++++++++++++++ 12 files changed, 579 insertions(+), 50 deletions(-) create mode 100644 go/libraries/doltcore/sqle/auto_gc.go create mode 100644 integration-tests/go-sql-server-driver/auto_gc_test.go diff --git a/go/cmd/dolt/commands/engine/sqlengine.go b/go/cmd/dolt/commands/engine/sqlengine.go index d7f9ee10fcf..e646da048ad 100644 --- a/go/cmd/dolt/commands/engine/sqlengine.go +++ b/go/cmd/dolt/commands/engine/sqlengine.go @@ -40,6 +40,7 @@ import ( dsqle "github.com/dolthub/dolt/go/libraries/doltcore/sqle" dblr "github.com/dolthub/dolt/go/libraries/doltcore/sqle/binlogreplication" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/cluster" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dprocedures" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/kvexec" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/mysql_file_handler" @@ -80,6 +81,7 @@ type SqlEngineConfig struct { JwksConfig []servercfg.JwksConfig SystemVariables SystemVariables ClusterController *cluster.Controller + AutoGCController *dsqle.AutoGCController BinlogReplicaController binlogreplication.BinlogReplicaController EventSchedulerStatus eventscheduler.SchedulerStatus } @@ -113,7 +115,8 @@ func NewSqlEngine( return nil, err } - all := dbs[:] + all := make([]dsess.SqlDatabase, len(dbs)) + copy(all, dbs) // this is overwritten only for server sessions for _, db := range dbs { @@ -192,6 +195,17 @@ func NewSqlEngine( statsPro := statspro.NewProvider(pro, statsnoms.NewNomsStatsFactory(mrEnv.RemoteDialProvider())) engine.Analyzer.Catalog.StatsProvider = statsPro + if config.AutoGCController != nil { + err = config.AutoGCController.RunBackgroundThread(bThreads, sqlEngine.NewDefaultContext) + if err != nil { + return nil, err + } + config.AutoGCController.ApplyCommitHooks(ctx, mrEnv, dbs...) + pro.InitDatabaseHooks = append(pro.InitDatabaseHooks, config.AutoGCController.InitDatabaseHook()) + // XXX: We force session aware safepoint controller if auto_gc is on. + dprocedures.UseSessionAwareSafepointController = true + } + engine.Analyzer.ExecBuilder = rowexec.NewOverrideBuilder(kvexec.Builder{}) sessFactory := doltSessionFactory(pro, statsPro, mrEnv.Config(), bcController, gcSafepointController, config.Autocommit) sqlEngine.provider = pro diff --git a/go/cmd/dolt/commands/sqlserver/command_line_config.go b/go/cmd/dolt/commands/sqlserver/command_line_config.go index 0d9bf9afdd6..5d0d8cde29d 100755 --- a/go/cmd/dolt/commands/sqlserver/command_line_config.go +++ b/go/cmd/dolt/commands/sqlserver/command_line_config.go @@ -489,6 +489,10 @@ func (cfg *commandLineServerConfig) ValueSet(value string) bool { return ok } +func (cfg *commandLineServerConfig) AutoGCBehavior() servercfg.AutoGCBehavior { + return stubAutoGCBehavior{} +} + // DoltServerConfigReader is the default implementation of ServerConfigReader suitable for parsing Dolt config files // and command line options. type DoltServerConfigReader struct{} @@ -510,3 +514,10 @@ func (d DoltServerConfigReader) ReadConfigFile(cwdFS filesys.Filesys, file strin func (d DoltServerConfigReader) ReadConfigArgs(args *argparser.ArgParseResults, dataDirOverride string) (servercfg.ServerConfig, error) { return NewCommandLineConfig(nil, args, dataDirOverride) } + +type stubAutoGCBehavior struct { +} + +func (stubAutoGCBehavior) Enable() bool { + return false +} diff --git a/go/cmd/dolt/commands/sqlserver/server.go b/go/cmd/dolt/commands/sqlserver/server.go index 33d253a377a..b3592cd265b 100644 --- a/go/cmd/dolt/commands/sqlserver/server.go +++ b/go/cmd/dolt/commands/sqlserver/server.go @@ -257,6 +257,16 @@ func ConfigureServices( } controller.Register(InitEventSchedulerStatus) + InitAutoGCController := &svcs.AnonService{ + InitF: func(context.Context) error { + if serverConfig.AutoGCBehavior().Enable() { + config.AutoGCController = sqle.NewAutoGCController(lgr) + } + return nil + }, + } + controller.Register(InitAutoGCController) + var sqlEngine *engine.SqlEngine InitSqlEngine := &svcs.AnonService{ InitF: func(ctx context.Context) (err error) { diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index e27a397915c..9982f3abc66 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1917,23 +1917,21 @@ func (ddb *DoltDB) IsTableFileStore() bool { // ChunkJournal returns the ChunkJournal for this DoltDB, if one is in use. func (ddb *DoltDB) ChunkJournal() *nbs.ChunkJournal { - tableFileStore, ok := datas.ChunkStoreFromDatabase(ddb.db).(chunks.TableFileStore) - if !ok { - return nil - } + cs := datas.ChunkStoreFromDatabase(ddb.db) - generationalNbs, ok := tableFileStore.(*nbs.GenerationalNBS) - if !ok { - return nil + var store *nbs.NomsBlockStore + generationalNBS, ok := cs.(*nbs.GenerationalNBS) + if ok { + store = generationalNBS.NewGen().(*nbs.NomsBlockStore) + } else { + store = cs.(*nbs.NomsBlockStore) } - newGen := generationalNbs.NewGen() - nbs, ok := newGen.(*nbs.NomsBlockStore) - if !ok { + if store != nil { + return store.ChunkJournal() + } else { return nil } - - return nbs.ChunkJournal() } func (ddb *DoltDB) TableFileStoreHasJournal(ctx context.Context) (bool, error) { diff --git a/go/libraries/doltcore/servercfg/serverconfig.go b/go/libraries/doltcore/servercfg/serverconfig.go index e69ae23bab0..79aa01c9588 100644 --- a/go/libraries/doltcore/servercfg/serverconfig.go +++ b/go/libraries/doltcore/servercfg/serverconfig.go @@ -48,6 +48,7 @@ const ( DefaultReadOnly = false DefaultLogLevel = LogLevel_Info DefaultAutoCommit = true + DefaultAutoGCBehaviorEnable = false DefaultDoltTransactionCommit = false DefaultMaxConnections = 100 DefaultDataDir = "." @@ -198,6 +199,8 @@ type ServerConfig interface { EventSchedulerStatus() string // ValueSet returns whether the value string provided was explicitly set in the config ValueSet(value string) bool + // AutoGCBehavior defines parameters around how auto-GC works for the running server. + AutoGCBehavior() AutoGCBehavior } // DefaultServerConfig creates a `*ServerConfig` that has all of the options set to their default values. @@ -214,6 +217,9 @@ func defaultServerConfigYAML() *YAMLConfig { ReadOnly: ptr(DefaultReadOnly), AutoCommit: ptr(DefaultAutoCommit), DoltTransactionCommit: ptr(DefaultDoltTransactionCommit), + AutoGCBehavior: &AutoGCBehaviorYAMLConfig{ + Enable_: ptr(DefaultAutoGCBehaviorEnable), + }, }, UserConfig: UserYAMLConfig{ Name: ptr(""), @@ -445,3 +451,7 @@ func CheckForUnixSocket(config ServerConfig) (string, bool, error) { return "", false, nil } + +type AutoGCBehavior interface { + Enable() bool +} diff --git a/go/libraries/doltcore/servercfg/yaml_config.go b/go/libraries/doltcore/servercfg/yaml_config.go index c5fc56b9778..5f370f33505 100644 --- a/go/libraries/doltcore/servercfg/yaml_config.go +++ b/go/libraries/doltcore/servercfg/yaml_config.go @@ -65,6 +65,8 @@ type BehaviorYAMLConfig struct { DoltTransactionCommit *bool `yaml:"dolt_transaction_commit,omitempty"` EventSchedulerStatus *string `yaml:"event_scheduler,omitempty" minver:"1.17.0"` + + AutoGCBehavior *AutoGCBehaviorYAMLConfig `yaml:"auto_gc_behavior,omitempty" minver:"TBD"` } // UserYAMLConfig contains server configuration regarding the user account clients must use to connect @@ -176,6 +178,7 @@ func YamlConfigFromFile(fs filesys.Filesys, path string) (ServerConfig, error) { func ServerConfigAsYAMLConfig(cfg ServerConfig) *YAMLConfig { systemVars := cfg.SystemVars() + autoGCBehavior := toAutoGCBehaviorYAML(cfg.AutoGCBehavior()) return &YAMLConfig{ LogLevelStr: ptr(string(cfg.LogLevel())), MaxQueryLenInLogs: nillableIntPtr(cfg.MaxLoggedQueryLen()), @@ -186,6 +189,7 @@ func ServerConfigAsYAMLConfig(cfg ServerConfig) *YAMLConfig { DisableClientMultiStatements: ptr(cfg.DisableClientMultiStatements()), DoltTransactionCommit: ptr(cfg.DoltTransactionCommit()), EventSchedulerStatus: ptr(cfg.EventSchedulerStatus()), + AutoGCBehavior: autoGCBehavior, }, ListenerConfig: ListenerYAMLConfig{ HostStr: ptr(cfg.Host()), @@ -817,6 +821,13 @@ func (cfg YAMLConfig) ClusterConfig() ClusterConfig { return cfg.ClusterCfg } +func (cfg YAMLConfig) AutoGCBehavior() AutoGCBehavior { + if cfg.BehaviorConfig.AutoGCBehavior == nil { + return nil + } + return cfg.BehaviorConfig.AutoGCBehavior +} + func (cfg YAMLConfig) EventSchedulerStatus() string { if cfg.BehaviorConfig.EventSchedulerStatus == nil { return "ON" @@ -922,3 +933,20 @@ func (cfg YAMLConfig) ValueSet(value string) bool { } return false } + +type AutoGCBehaviorYAMLConfig struct { + Enable_ *bool `yaml:"enable,omitempty" minver:"TBD"` +} + +func (a *AutoGCBehaviorYAMLConfig) Enable() bool { + if a.Enable_ == nil { + return false + } + return *a.Enable_ +} + +func toAutoGCBehaviorYAML(a AutoGCBehavior) *AutoGCBehaviorYAMLConfig { + return &AutoGCBehaviorYAMLConfig{ + Enable_: ptr(a.Enable()), + } +} diff --git a/go/libraries/doltcore/servercfg/yaml_config_test.go b/go/libraries/doltcore/servercfg/yaml_config_test.go index 76dd3f21f34..5f6dd64c62c 100644 --- a/go/libraries/doltcore/servercfg/yaml_config_test.go +++ b/go/libraries/doltcore/servercfg/yaml_config_test.go @@ -34,6 +34,8 @@ behavior: dolt_transaction_commit: true disable_client_multi_statements: false event_scheduler: ON + auto_gc_behavior: + enable: false listener: host: localhost diff --git a/go/libraries/doltcore/sqle/auto_gc.go b/go/libraries/doltcore/sqle/auto_gc.go new file mode 100644 index 00000000000..23995a84f8b --- /dev/null +++ b/go/libraries/doltcore/sqle/auto_gc.go @@ -0,0 +1,225 @@ +// Copyright 2025 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqle + +import ( + "context" + "io" + "sync" + "time" + + "github.com/dolthub/go-mysql-server/sql" + "github.com/sirupsen/logrus" + + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" + "github.com/dolthub/dolt/go/libraries/doltcore/env" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dprocedures" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" + "github.com/dolthub/dolt/go/store/datas" + "github.com/dolthub/dolt/go/store/types" +) + +// Auto GC is the ability of a running SQL server engine to perform +// dolt_gc() behaviors periodically. If enabled, it currently works as +// follows: +// +// An AutoGCController is created for a running SQL Engine. The +// controller runs a background thread which is only ever running one +// GC at a time. Post Commit Hooks are installed on every database in +// the DoltDatabaseProvider for the SQL Engine. Those hooks check if +// it is time to perform a GC for that particular database. If it is, +// they forward a request the background thread to register the +// database as wanting a GC. + +type AutoGCController struct { + workCh chan autoGCWork + lgr *logrus.Logger +} + +func NewAutoGCController(lgr *logrus.Logger) *AutoGCController { + return &AutoGCController{ + workCh: make(chan autoGCWork), + lgr: lgr, + } +} + +// Passed by a commit hook to the auto-GC thread, requesting the +// thread to dolt_gc |db|. When the GC is finished, |done| will be +// closed. Signalling completion allows the commit hook to only +// submit one dolt_gc request at a time. +type autoGCWork struct { + db *doltdb.DoltDB + done chan struct{} + name string // only for logging. +} + +// During engine initialization, this should be called to ensure the +// background worker threads responsible for performing the GC are +// running. +func (c *AutoGCController) RunBackgroundThread(threads *sql.BackgroundThreads, ctxF func(context.Context) (*sql.Context, error)) error { + return threads.Add("auto_gc_thread", func(ctx context.Context) { + var wg sync.WaitGroup + runCh := make(chan autoGCWork) + wg.Add(1) + go func() { + defer wg.Done() + dbs := make([]autoGCWork, 0) + // Accumulate GC requests, only one will come in per database at a time. + // Send the oldest one out to the worker when it is ready. + for { + var toSendCh chan autoGCWork + var toSend autoGCWork + if len(dbs) > 0 { + toSend = dbs[0] + toSendCh = runCh + } + select { + case <-ctx.Done(): + // sql.BackgroundThreads is shutting down. + // No need to drain or anything; just + // return. + return + case newDB := <-c.workCh: + dbs = append(dbs, newDB) + case toSendCh <- toSend: + // We just sent the front of the slice. + // Delete it from our set of pending GCs. + copy(dbs[:], dbs[1:]) + dbs = dbs[:len(dbs)-1] + } + + } + }() + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case work := <-runCh: + c.doWork(ctx, work, ctxF) + } + } + }() + wg.Wait() + }) +} + +func (c *AutoGCController) doWork(ctx context.Context, work autoGCWork, ctxF func(context.Context) (*sql.Context, error)) { + defer close(work.done) + sqlCtx, err := ctxF(ctx) + if err != nil { + c.lgr.Warnf("sqle/auto_gc: Could not create session to GC %s: %v", work.name, err) + return + } + c.lgr.Tracef("sqle/auto_gc: Beginning auto GC of database %s", work.name) + start := time.Now() + defer sql.SessionEnd(sqlCtx.Session) + sql.SessionCommandBegin(sqlCtx.Session) + defer sql.SessionCommandEnd(sqlCtx.Session) + err = dprocedures.RunDoltGC(sqlCtx, work.db, types.GCModeDefault) + if err != nil { + c.lgr.Warnf("sqle/auto_gc: Attempt to auto GC database %s failed with error: %v", work.name, err) + return + } + c.lgr.Infof("sqle/auto_gc: Successfully completed auto GC of database %s in %v", work.name, time.Since(start)) +} + +func (c *AutoGCController) newCommitHook(name string) doltdb.CommitHook { + closed := make(chan struct{}) + close(closed) + return &autoGCCommitHook{ + c: c, + name: name, + done: closed, + next: make(chan struct{}), + } +} + +// The doltdb.CommitHook which watches for database changes and +// requests dolt_gcs. +type autoGCCommitHook struct { + c *AutoGCController + name string + // Always non-nil, if this channel delivers this channel + // delivers when no GC is currently running. + done chan struct{} + // It simplifies the logic and efficiency of the + // implementation a bit to have a + // we can send. It becomes our new |done| channel on a + // successful send. + next chan struct{} + // |done| and |next| are mutable and |Execute| can be called + // concurrently. We protect them with |mu|. + mu sync.Mutex +} + +// During engine initialization, called on the original set of +// databases to configure them for auto-GC. +func (c *AutoGCController) ApplyCommitHooks(ctx context.Context, mrEnv *env.MultiRepoEnv, dbs ...dsess.SqlDatabase) error { + for _, db := range dbs { + denv := mrEnv.GetEnv(db.Name()) + if denv == nil { + continue + } + denv.DoltDB(ctx).PrependCommitHooks(ctx, c.newCommitHook(db.Name())) + } + return nil +} + +func (c *AutoGCController) InitDatabaseHook() InitDatabaseHook { + return func(ctx *sql.Context, pro *DoltDatabaseProvider, name string, env *env.DoltEnv, db dsess.SqlDatabase) error { + env.DoltDB(ctx).PrependCommitHooks(ctx, c.newCommitHook(name)) + return nil + } +} + +func (h *autoGCCommitHook) Execute(ctx context.Context, ds datas.Dataset, db *doltdb.DoltDB) (func(context.Context) error, error) { + h.mu.Lock() + defer h.mu.Unlock() + select { + case <-h.done: + journal := db.ChunkJournal() + if journal != nil { + const size_128mb = (1 << 27) + if journal.Size() > size_128mb { + // We want a GC... + select { + case h.c.workCh <- autoGCWork{db, h.next, h.name}: + h.done = h.next + h.next = make(chan struct{}) + case <-ctx.Done(): + return nil, context.Cause(ctx) + } + } + } + default: + // A GC is running or pending. No need to check. + } + return nil, nil +} + +func (h *autoGCCommitHook) HandleError(ctx context.Context, err error) error { + return nil +} + +func (h *autoGCCommitHook) SetLogger(ctx context.Context, wr io.Writer) error { + return nil +} + +func (h *autoGCCommitHook) ExecuteForWorkingSets() bool { + return true +} diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go index c2acc6f5f9e..e798ee1a594 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go @@ -27,6 +27,7 @@ import ( "github.com/dolthub/dolt/go/cmd/dolt/cli" "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/types" @@ -37,15 +38,13 @@ const ( cmdSuccess = 0 ) -var useSessionAwareSafepointController bool - func init() { if os.Getenv(dconfig.EnvDisableGcProcedure) != "" { DoltGCFeatureFlag = false } if choice := os.Getenv(dconfig.EnvGCSafepointControllerChoice); choice != "" { if choice == "session_aware" { - useSessionAwareSafepointController = true + UseSessionAwareSafepointController = true } else if choice != "kill_connections" { panic("Invalid value for " + dconfig.EnvGCSafepointControllerChoice + ". must be session_aware or kill_connections") } @@ -53,6 +52,7 @@ func init() { } var DoltGCFeatureFlag = true +var UseSessionAwareSafepointController = false // doltGC is the stored procedure to run online garbage collection on a database. func doltGC(ctx *sql.Context, args ...string) (sql.RowIter, error) { @@ -156,7 +156,6 @@ func (sc killConnectionsSafepointController) CancelSafepoint() { type sessionAwareSafepointController struct { controller *dsess.GCSafepointController callCtx *sql.Context - origEpoch int waiter *dsess.GCSafepointWaiter keeper func(hash.Hash) bool @@ -182,7 +181,7 @@ func (sc *sessionAwareSafepointController) EstablishPreFinalizeSafepoint(ctx con } func (sc *sessionAwareSafepointController) EstablishPostFinalizeSafepoint(ctx context.Context) error { - return checkEpochSame(sc.origEpoch) + return nil } func (sc *sessionAwareSafepointController) CancelSafepoint() { @@ -226,48 +225,50 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) { return cmdFailure, err } } else { - // Currently, if this server is involved in cluster - // replication, a full GC is only safe to run on the primary. - // We assert that we are the primary here before we begin, and - // we assert again that we are the primary at the same epoch as - // we establish the safepoint. + var mode types.GCMode = types.GCModeDefault + if apr.Contains(cli.FullFlag) { + mode = types.GCModeFull + } + + err := RunDoltGC(ctx, ddb, mode) + if err != nil { + return cmdFailure, err + } + } + + return cmdSuccess, nil +} + +func RunDoltGC(ctx *sql.Context, ddb *doltdb.DoltDB, mode types.GCMode) error { + var sc types.GCSafepointController + if UseSessionAwareSafepointController { + dSess := dsess.DSessFromSess(ctx.Session) + gcSafepointController := dSess.GCSafepointController() + sc = &sessionAwareSafepointController{ + callCtx: ctx, + controller: gcSafepointController, + } + } else { + // Legacy safepoint controller behavior was to not + // allow GC on a standby server. GC on a standby server + // with killConnections safepoints should be safe now, + // but we retain this legacy behavior for now. origepoch := -1 if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok { // TODO: magic constant... if role.(string) != "primary" { - return cmdFailure, fmt.Errorf("cannot run a full dolt_gc() while cluster replication is enabled and role is %s; must be the primary", role.(string)) + return fmt.Errorf("cannot run a full dolt_gc() while cluster replication is enabled and role is %s; must be the primary", role.(string)) } _, epoch, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleEpochVariable) if !ok { - return cmdFailure, fmt.Errorf("internal error: cannot run a full dolt_gc(); cluster replication is enabled but could not read %s", dsess.DoltClusterRoleEpochVariable) + return fmt.Errorf("internal error: cannot run a full dolt_gc(); cluster replication is enabled but could not read %s", dsess.DoltClusterRoleEpochVariable) } origepoch = epoch.(int) } - - var mode types.GCMode = types.GCModeDefault - if apr.Contains(cli.FullFlag) { - mode = types.GCModeFull - } - - var sc types.GCSafepointController - if useSessionAwareSafepointController { - gcSafepointController := dSess.GCSafepointController() - sc = &sessionAwareSafepointController{ - origEpoch: origepoch, - callCtx: ctx, - controller: gcSafepointController, - } - } else { - sc = killConnectionsSafepointController{ - origEpoch: origepoch, - callCtx: ctx, - } - } - err = ddb.GC(ctx, mode, sc) - if err != nil { - return cmdFailure, err + sc = killConnectionsSafepointController{ + origEpoch: origepoch, + callCtx: ctx, } } - - return cmdSuccess, nil + return ddb.GC(ctx, mode, sc) } diff --git a/go/store/nbs/journal.go b/go/store/nbs/journal.go index e33c8614ae8..16310f15cdd 100644 --- a/go/store/nbs/journal.go +++ b/go/store/nbs/journal.go @@ -490,6 +490,10 @@ func (j *ChunkJournal) AccessMode() chunks.ExclusiveAccessMode { return chunks.ExclusiveAccessMode_Exclusive } +func (j *ChunkJournal) Size() int64 { + return j.wr.size() +} + type journalConjoiner struct { child conjoinStrategy } diff --git a/go/store/nbs/journal_writer.go b/go/store/nbs/journal_writer.go index caea111b147..1aedf7f073d 100644 --- a/go/store/nbs/journal_writer.go +++ b/go/store/nbs/journal_writer.go @@ -472,6 +472,12 @@ func (wr *journalWriter) commitRootHash(ctx context.Context, root hash.Hash) err return wr.commitRootHashUnlocked(ctx, root) } +func (wr *journalWriter) size() int64 { + wr.lock.Lock() + defer wr.lock.Unlock() + return wr.off +} + func (wr *journalWriter) commitRootHashUnlocked(ctx context.Context, root hash.Hash) error { defer trace.StartRegion(ctx, "commit-root").End() diff --git a/integration-tests/go-sql-server-driver/auto_gc_test.go b/integration-tests/go-sql-server-driver/auto_gc_test.go new file mode 100644 index 00000000000..8f89f3b8820 --- /dev/null +++ b/integration-tests/go-sql-server-driver/auto_gc_test.go @@ -0,0 +1,220 @@ +// Copyright 2025 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "database/sql" + "fmt" + "math/rand/v2" + "os" + "path/filepath" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + driver "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/sql_server_driver" +) + +func TestAutoGC(t *testing.T) { + var enabled_16, final_16, disabled, final_disabled RepoSize + t.Run("Enable", func(t *testing.T) { + t.Run("CommitEvery16", func(t *testing.T) { + enabled_16, final_16 = runAutoGCTest(t, true, 16) + t.Logf("repo size before final gc: %v", enabled_16) + t.Logf("repo size after final gc: %v", final_16) + }) + }) + t.Run("Disabled", func(t *testing.T) { + disabled, final_disabled = runAutoGCTest(t, false, 128) + t.Logf("repo size before final gc: %v", disabled) + t.Logf("repo size after final gc: %v", final_disabled) + }) + if enabled_16.NewGen > 0 && disabled.NewGen > 0 { + assert.Greater(t, enabled_16.OldGen, disabled.OldGen) + assert.Greater(t, enabled_16.OldGenC, disabled.OldGenC) + assert.Greater(t, enabled_16.NewGen-enabled_16.Journal, enabled_16.Journal) + assert.Less(t, disabled.NewGen-disabled.Journal, disabled.Journal) + } +} + +func setupAutoGCTest(ctx context.Context, t *testing.T, enable bool) (string, *sql.DB) { + u, err := driver.NewDoltUser() + require.NoError(t, err) + t.Cleanup(func() { + u.Cleanup() + }) + + rs, err := u.MakeRepoStore() + require.NoError(t, err) + + repo, err := rs.MakeRepo("auto_gc_test") + require.NoError(t, err) + + err = driver.WithFile{ + Name: "server.yaml", + Contents: fmt.Sprintf(` +behavior: + auto_gc_behavior: + enable: %v +`, enable), + }.WriteAtDir(repo.Dir) + require.NoError(t, err) + + server := MakeServer(t, repo, &driver.Server{ + Args: []string{"--config", "server.yaml"}, + }) + server.DBName = "auto_gc_test" + + db, err := server.DB(driver.Connection{User: "root"}) + require.NoError(t, err) + t.Cleanup(func() { + db.Close() + }) + + // Create the database... + conn, err := db.Conn(ctx) + require.NoError(t, err) + _, err = conn.ExecContext(ctx, ` +create table vals ( + id bigint primary key, + v1 bigint, + v2 bigint, + v3 bigint, + v4 bigint, + index (v1), + index (v2), + index (v3), + index (v4), + index (v1,v2), + index (v1,v3), + index (v1,v4), + index (v2,v3), + index (v2,v4), + index (v2,v1), + index (v3,v1), + index (v3,v2), + index (v3,v4), + index (v4,v1), + index (v4,v2), + index (v4,v3) +) +`) + require.NoError(t, err) + _, err = conn.ExecContext(ctx, "call dolt_commit('-Am', 'create vals table')") + require.NoError(t, err) + require.NoError(t, conn.Close()) + + return repo.Dir, db +} + +func autoGCInsertStatement(i int) string { + var vals []string + for j := i * 1024; j < (i+1)*1024; j++ { + var vs [4]string + vs[0] = strconv.Itoa(rand.Int()) + vs[1] = strconv.Itoa(rand.Int()) + vs[2] = strconv.Itoa(rand.Int()) + vs[3] = strconv.Itoa(rand.Int()) + val := "(" + strconv.Itoa(j) + "," + strings.Join(vs[:], ",") + ")" + vals = append(vals, val) + } + return "insert into vals values " + strings.Join(vals, ",") +} + +func runAutoGCTest(t *testing.T, enable bool, commitEvery int) (RepoSize, RepoSize) { + // A simple auto-GC test, where we run + // operations on an auto GC server and + // ensure that the database is getting + // collected. + ctx := context.Background() + dir, db := setupAutoGCTest(ctx, t, enable) + + for i := 0; i < 64; i++ { + stmt := autoGCInsertStatement(i) + conn, err := db.Conn(ctx) + _, err = conn.ExecContext(ctx, stmt) + require.NoError(t, err) + if i%commitEvery == 0 { + _, err = conn.ExecContext(ctx, "call dolt_commit('-am', 'insert from "+strconv.Itoa(i*1024)+"')") + require.NoError(t, err) + } + require.NoError(t, conn.Close()) + } + + before, err := GetRepoSize(dir) + require.NoError(t, err) + conn, err := db.Conn(ctx) + require.NoError(t, err) + _, err = conn.ExecContext(ctx, "call dolt_gc('--full')") + require.NoError(t, err) + require.NoError(t, conn.Close()) + after, err := GetRepoSize(dir) + require.NoError(t, err) + return before, after +} + +type RepoSize struct { + Journal int64 + NewGen int64 + NewGenC int + OldGen int64 + OldGenC int +} + +func GetRepoSize(dir string) (RepoSize, error) { + var ret RepoSize + entries, err := os.ReadDir(filepath.Join(dir, ".dolt/noms")) + if err != nil { + return ret, err + } + for _, e := range entries { + stat, err := e.Info() + if err != nil { + return ret, err + } + if stat.IsDir() { + continue + } + ret.NewGen += stat.Size() + ret.NewGenC += 1 + if e.Name() == "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv" { + ret.Journal += stat.Size() + } + } + entries, err = os.ReadDir(filepath.Join(dir, ".dolt/noms/oldgen")) + if err != nil { + return ret, err + } + for _, e := range entries { + stat, err := e.Info() + if err != nil { + return ret, err + } + if stat.IsDir() { + continue + } + ret.OldGen += stat.Size() + ret.OldGenC += 1 + } + return ret, nil +} + +func (rs RepoSize) String() string { + return fmt.Sprintf("journal: %v, new gen: %v (%v files), old gen: %v (%v files)", rs.Journal, rs.NewGen, rs.NewGenC, rs.OldGen, rs.OldGenC) +}