Skip to content

Commit

Permalink
Merge pull request #8816 from dolthub/aaron/cluster-push-hook-sql-ses…
Browse files Browse the repository at this point in the history
…sion-lifecycle

[no-release-notes] go: sqle/cluster: commithook: Create a sql session and make lifecycle callbacks when accessing srcDB.
  • Loading branch information
reltuk authored Feb 11, 2025
2 parents 529e51e + b8a6bc7 commit ac6d4d4
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 25 deletions.
6 changes: 5 additions & 1 deletion go/cmd/dolt/commands/engine/sqlengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func NewSqlEngine(

config.ClusterController.ManageSystemVariables(sql.SystemVariables)

err = config.ClusterController.ApplyStandbyReplicationConfig(ctx, bThreads, mrEnv, dbs...)
err = config.ClusterController.ApplyStandbyReplicationConfig(ctx, mrEnv, dbs...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -200,6 +200,10 @@ func NewSqlEngine(
sqlEngine.engine = engine
sqlEngine.fs = pro.FileSystem()

if err = config.ClusterController.RunCommitHooks(bThreads, sqlEngine.NewDefaultContext); err != nil {
return nil, err
}

// configuring stats depends on sessionBuilder
// sessionBuilder needs ref to statsProv
if err = statsPro.Configure(ctx, sqlEngine.NewDefaultContext, bThreads, dbs); err != nil {
Expand Down
63 changes: 46 additions & 17 deletions go/libraries/doltcore/sqle/cluster/commithook.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type commithook struct {
lastSuccess time.Time
currentError *string
cancelReplicate func()
sqlCtxFactory SqlContextFactory

// waitNotify is set by controller when it needs to track whether the
// commithooks are caught up with replicating to the standby.
Expand Down Expand Up @@ -99,7 +100,8 @@ func newCommitHook(lgr *logrus.Logger, remotename, remoteurl, dbname string, rol
return &ret
}

func (h *commithook) Run(bt *sql.BackgroundThreads) error {
func (h *commithook) Run(bt *sql.BackgroundThreads, ctxF SqlContextFactory) error {
h.sqlCtxFactory = ctxF
return bt.Add("Standby Replication - "+h.dbname+" to "+h.remotename, h.run)
}

Expand Down Expand Up @@ -142,20 +144,30 @@ func (h *commithook) replicate(ctx context.Context) {
}
if h.primaryNeedsInit() {
lgr.Tracef("cluster/commithook: fetching current head.")
// When the replicate thread comes up, it attempts to replicate the current head.
datasDB := doltdb.HackDatasDatabaseFromDoltDB(h.srcDB)
cs := datas.ChunkStoreFromDatabase(datasDB)
var err error
h.nextHead, err = cs.Root(ctx)
if err != nil {
// TODO: if err != nil, something is really wrong; should shutdown or backoff.
lgr.Warningf("standby replication thread failed to load database root: %v", err)
h.nextHead = hash.Hash{}
}
func() {
sqlCtx, err := h.sqlCtxFactory(ctx)
if err != nil {
lgr.Warningf("standby replication thread failed to load database root: could not create sql.Context: %v", err)
return
}
defer sql.SessionEnd(sqlCtx.Session)
sql.SessionCommandBegin(sqlCtx.Session)
defer sql.SessionCommandEnd(sqlCtx.Session)

// When the replicate thread comes up, it attempts to replicate the current head.
datasDB := doltdb.HackDatasDatabaseFromDoltDB(h.srcDB)
cs := datas.ChunkStoreFromDatabase(datasDB)
h.nextHead, err = cs.Root(ctx)
if err != nil {
// TODO: if err != nil, something is really wrong; should shutdown or backoff.
lgr.Warningf("standby replication thread failed to load database root: %v", err)
h.nextHead = hash.Hash{}
}

// We do not know when this head was written, but we
// are starting to try to replicate it now.
h.nextHeadIncomingTime = time.Now()
// We do not know when this head was written, but we
// are starting to try to replicate it now.
h.nextHeadIncomingTime = time.Now()
}()
} else if h.shouldReplicate() {
h.attemptReplicate(ctx)
shouldHeartbeat = false
Expand Down Expand Up @@ -255,6 +267,9 @@ func (h *commithook) attemptHeartbeat(ctx context.Context) {
}
h.cancelReplicate = nil
}()
// We do not take a sql.Context here. Our
// sql Session lifecycle events are for
// accessing srcDB, not destDB.
h.mu.Unlock()
datasDB := doltdb.HackDatasDatabaseFromDoltDB(destDB)
cs := datas.ChunkStoreFromDatabase(datasDB)
Expand Down Expand Up @@ -283,20 +298,34 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
defer h.progressNotifier.RecordFailure(attempt)
h.mu.Unlock()

sqlCtx, err := h.sqlCtxFactory(ctx)
if err != nil {
h.mu.Lock()
h.currentError = new(string)
*h.currentError = fmt.Sprintf("could not replicate to standby: error creating sql.Context: %v.", err)
lgr.Warnf("cluster/commithook: could not replicate to standby: error creating sql.Context: %v.", err)
if toPush == h.nextHead {
h.nextPushAttempt = time.Now().Add(1 * time.Second)
}
return
}
defer sql.SessionEnd(sqlCtx.Session)
sql.SessionCommandBegin(sqlCtx.Session)
defer sql.SessionCommandEnd(sqlCtx.Session)

if destDB == nil {
lgr.Tracef("cluster/commithook: attempting to fetch destDB.")
var err error
destDB, err = h.destDBF(ctx)
if err != nil {
h.mu.Lock()
h.currentError = new(string)
*h.currentError = fmt.Sprintf("could not replicate to standby: error fetching destDB: %v", err)
lgr.Warnf("cluster/commithook: could not replicate to standby: error fetching destDB: %v.", err)
h.mu.Lock()
// TODO: We could add some backoff here.
if toPush == h.nextHead {
h.nextPushAttempt = time.Now().Add(1 * time.Second)
}
h.cancelReplicate = nil
return
}
lgr.Tracef("cluster/commithook: fetched destDB")
Expand All @@ -306,7 +335,7 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
}

lgr.Tracef("cluster/commithook: pushing chunks for root hash %v to destDB", toPush.String())
err := destDB.PullChunks(ctx, h.tempDir, h.srcDB, []hash.Hash{toPush}, nil, nil)
err = destDB.PullChunks(ctx, h.tempDir, h.srcDB, []hash.Hash{toPush}, nil, nil)
if err == nil {
lgr.Tracef("cluster/commithook: successfully pushed chunks, setting root")
datasDB := doltdb.HackDatasDatabaseFromDoltDB(destDB)
Expand Down
27 changes: 21 additions & 6 deletions go/libraries/doltcore/sqle/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type databaseDropReplication struct {
wg *sync.WaitGroup
}

type SqlContextFactory func(ctx context.Context) (*sql.Context, error)

type Controller struct {
cfg servercfg.ClusterConfig
persistentCfg config.ReadWriteConfig
Expand Down Expand Up @@ -110,6 +112,8 @@ type Controller struct {
dropDatabase func(*sql.Context, string) error
outstandingDropDatabases map[string]*databaseDropReplication
remoteSrvDBCache remotesrv.DBCache

sqlCtxFactory SqlContextFactory
}

type sqlvars interface {
Expand Down Expand Up @@ -247,7 +251,7 @@ func (c *Controller) ManageSystemVariables(variables sqlvars) {
c.refreshSystemVars()
}

func (c *Controller) ApplyStandbyReplicationConfig(ctx context.Context, bt *sql.BackgroundThreads, mrEnv *env.MultiRepoEnv, dbs ...dsess.SqlDatabase) error {
func (c *Controller) ApplyStandbyReplicationConfig(ctx context.Context, mrEnv *env.MultiRepoEnv, dbs ...dsess.SqlDatabase) error {
if c == nil {
return nil
}
Expand All @@ -259,7 +263,7 @@ func (c *Controller) ApplyStandbyReplicationConfig(ctx context.Context, bt *sql.
continue
}
c.lgr.Tracef("cluster/controller: applying commit hooks for %s with role %s", db.Name(), string(c.role))
hooks, err := c.applyCommitHooks(ctx, db.Name(), bt, denv)
hooks, err := c.applyCommitHooks(ctx, db.Name(), denv)
if err != nil {
return err
}
Expand Down Expand Up @@ -291,7 +295,7 @@ func (c *Controller) ManageQueryConnections(iterSessions IterSessions, killQuery
c.killConnection = killConnection
}

func (c *Controller) applyCommitHooks(ctx context.Context, name string, bt *sql.BackgroundThreads, denv *env.DoltEnv) ([]*commithook, error) {
func (c *Controller) applyCommitHooks(ctx context.Context, name string, denv *env.DoltEnv) ([]*commithook, error) {
ttfdir, err := denv.TempTableFilesDir()
if err != nil {
return nil, err
Expand All @@ -316,14 +320,25 @@ func (c *Controller) applyCommitHooks(ctx context.Context, name string, bt *sql.
return remote.GetRemoteDB(ctx, types.Format_Default, dialprovider)
}, denv.DoltDB(ctx), ttfdir)
denv.DoltDB(ctx).PrependCommitHooks(ctx, commitHook)
if err := commitHook.Run(bt); err != nil {
return nil, err
}
hooks = append(hooks, commitHook)
}
return hooks, nil
}

func (c *Controller) RunCommitHooks(bt *sql.BackgroundThreads, ctxF SqlContextFactory) error {
if c == nil {
return nil
}
c.sqlCtxFactory = ctxF
for _, hook := range c.commithooks {
err := hook.Run(bt, ctxF)
if err != nil {
return err
}
}
return nil
}

func (c *Controller) gRPCDialProvider(denv *env.DoltEnv) dbfactory.GRPCDialProvider {
return grpcDialProvider{env.NewGRPCDialProviderFromDoltEnv(denv), &c.cinterceptor, c.tlsCfg, c.grpcCreds}
}
Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/sqle/cluster/initdbhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads) sqle
commitHook := newCommitHook(controller.lgr, r.Name(), remoteUrls[i], name, role, remoteDBs[i], denv.DoltDB(ctx), ttfdir)
denv.DoltDB(ctx).PrependCommitHooks(ctx, commitHook)
controller.registerCommitHook(commitHook)
if err := commitHook.Run(bt); err != nil {
if err := commitHook.Run(bt, controller.sqlCtxFactory); err != nil {
// XXX: An error here means we are not replicating to every standby.
return err
}
Expand Down

0 comments on commit ac6d4d4

Please sign in to comment.