Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split DB loading into two stages. #8852

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions go/cmd/dolt/dolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func runMain() int {
args = nil

// This is the dEnv passed to sub-commands, and is used to create the multi-repo environment.
dEnv := env.LoadWithoutDB(ctx, env.GetCurrentUserHomeDir, cfg.dataDirFS, doltdb.LocalDirDoltDB, doltversion.Version)
dEnv := env.Load(ctx, env.GetCurrentUserHomeDir, cfg.dataDirFS, doltdb.LocalDirDoltDB, doltversion.Version)

if dEnv.CfgLoadErr != nil {
cli.PrintErrln(color.RedString("Failed to load the global config. %v", dEnv.CfgLoadErr))
Expand Down Expand Up @@ -678,11 +678,8 @@ If you're interested in running this command against a remote host, hit us up on
}

var lookForServer bool
if targetEnv.DoltDB(ctx) != nil && targetEnv.IsAccessModeReadOnly(ctx) {
// If the loaded target environment has a doltDB and we do not
// have access to it, we look for a server.
lookForServer = true
} else if targetEnv.DoltDB(ctx) == nil {

if !targetEnv.Exists() {
// If the loaded environment itself does not have a doltDB, we
// may want to look for a server. We do so if all of the
// repositories in our MultiEnv are ReadOnly. This includes the
Expand All @@ -695,6 +692,10 @@ If you're interested in running this command against a remote host, hit us up on
return !allReposAreReadOnly, nil
})
lookForServer = allReposAreReadOnly
} else if targetEnv.IsAccessModeReadOnly(ctx) {
// If the loaded target environment has a doltDB and we do not
// have access to it, we look for a server.
lookForServer = true
}
if lookForServer {
localCreds, err := sqlserver.FindAndLoadLocalCreds(targetEnv.FS)
Expand Down
13 changes: 3 additions & 10 deletions go/libraries/doltcore/dbfactory/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ import (

"github.com/dolthub/dolt/go/libraries/utils/awsrefreshcreds"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/nbs"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
)

Expand Down Expand Up @@ -117,19 +115,14 @@ func (fact AWSFactory) PrepareDB(ctx context.Context, nbf *types.NomsBinFormat,
}

// CreateDB creates an AWS backed database
func (fact AWSFactory) CreateDB(ctx context.Context, nbf *types.NomsBinFormat, urlObj *url.URL, params map[string]interface{}) (datas.Database, types.ValueReadWriter, tree.NodeStore, error) {
var db datas.Database
func (fact AWSFactory) GetDBLoader(ctx context.Context, nbf *types.NomsBinFormat, urlObj *url.URL, params map[string]interface{}) (DBLoader, error) {
cs, err := fact.newChunkStore(ctx, nbf, urlObj, params)

if err != nil {
return nil, nil, nil, err
return nil, err
}

vrw := types.NewValueStore(cs)
ns := tree.NewNodeStore(cs)
db = datas.NewTypesDatabase(vrw, ns)

return db, vrw, ns, nil
return ChunkStoreLoader{cs: cs}, nil
}

func (fact AWSFactory) newChunkStore(ctx context.Context, nbf *types.NomsBinFormat, urlObj *url.URL, params map[string]interface{}) (chunks.ChunkStore, error) {
Expand Down
41 changes: 36 additions & 5 deletions go/libraries/doltcore/dbfactory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package dbfactory
import (
"context"
"fmt"
"github.com/dolthub/dolt/go/store/chunks"
"net/url"
"strings"

Expand Down Expand Up @@ -57,10 +58,32 @@ const (
defaultMemTableSize = 256 * 1024 * 1024
)

type DBLoader interface {
IsAccessModeReadOnly() bool
LoadDB(ctx context.Context) (datas.Database, types.ValueReadWriter, tree.NodeStore, error)
}

type ChunkStoreLoader struct {
cs chunks.ChunkStore
}

var _ DBLoader = ChunkStoreLoader{}

func (l ChunkStoreLoader) IsAccessModeReadOnly() bool {
return false
}

func (l ChunkStoreLoader) LoadDB(ctx context.Context) (datas.Database, types.ValueReadWriter, tree.NodeStore, error) {
vrw := types.NewValueStore(l.cs)
ns := tree.NewNodeStore(l.cs)
db := datas.NewTypesDatabase(vrw, ns)
return db, vrw, ns, nil
}

// DBFactory is an interface for creating concrete datas.Database instances from different backing stores
type DBFactory interface {
// CreateDB returns the database located at the URL given and its associated data access interfaces
CreateDB(ctx context.Context, nbf *types.NomsBinFormat, u *url.URL, params map[string]interface{}) (datas.Database, types.ValueReadWriter, tree.NodeStore, error)
// GetDBLoader verifies the DB exists and acquires any necessary locks, returning an object that can be used to load the DB.
GetDBLoader(ctx context.Context, nbf *types.NomsBinFormat, u *url.URL, params map[string]interface{}) (DBLoader, error)
// PrepareDB does any necessary setup work for a new database to be created at the URL given, e.g. to receive a push.
// Not all factories support this operation.
PrepareDB(ctx context.Context, nbf *types.NomsBinFormat, u *url.URL, params map[string]interface{}) error
Expand All @@ -83,10 +106,18 @@ var DBFactories = map[string]DBFactory{
// CreateDB creates a database based on the supplied urlStr, and creation params. The DBFactory used for creation is
// determined by the scheme of the url. Naked urls will use https by default.
func CreateDB(ctx context.Context, nbf *types.NomsBinFormat, urlStr string, params map[string]interface{}) (datas.Database, types.ValueReadWriter, tree.NodeStore, error) {
dbLoader, err := GetDBLoader(ctx, nbf, urlStr, params)
if err != nil {
return nil, nil, nil, err
}
return dbLoader.LoadDB(ctx)
}

func GetDBLoader(ctx context.Context, nbf *types.NomsBinFormat, urlStr string, params map[string]interface{}) (DBLoader, error) {
urlObj, err := earl.Parse(urlStr)

if err != nil {
return nil, nil, nil, err
return nil, err
}

scheme := urlObj.Scheme
Expand All @@ -95,10 +126,10 @@ func CreateDB(ctx context.Context, nbf *types.NomsBinFormat, urlStr string, para
}

if fact, ok := DBFactories[strings.ToLower(scheme)]; ok {
return fact.CreateDB(ctx, nbf, urlObj, params)
return fact.GetDBLoader(ctx, nbf, urlObj, params)
}

return nil, nil, nil, fmt.Errorf("unknown url scheme: '%s'", urlObj.Scheme)
return nil, fmt.Errorf("unknown url scheme: '%s'", urlObj.Scheme)
}

// PrepareDB does the necessary work to create a database at the URL given, e.g. to ready a new remote for pushing. Not
Expand Down
106 changes: 74 additions & 32 deletions go/libraries/doltcore/dbfactory/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"github.com/dolthub/fslock"
"net/url"
"os"
"path/filepath"
Expand Down Expand Up @@ -62,12 +63,14 @@ type FileFactory struct {
}

type singletonDB struct {
ddb datas.Database
vrw types.ValueReadWriter
ns tree.NodeStore
lockFile *fslock.Lock
ddb datas.Database
vrw types.ValueReadWriter
ns tree.NodeStore
}

var singletonLock = new(sync.Mutex)
var singletonFileLocks = make(map[string]*fslock.Lock)
var singletons = make(map[string]singletonDB)

func CloseAllLocalDatabases() (err error) {
Expand Down Expand Up @@ -114,46 +117,40 @@ func (fact FileFactory) PrepareDB(ctx context.Context, nbf *types.NomsBinFormat,
return nil
}

// CreateDB creates a local filesys backed database
func (fact FileFactory) CreateDB(ctx context.Context, nbf *types.NomsBinFormat, urlObj *url.URL, params map[string]interface{}) (datas.Database, types.ValueReadWriter, tree.NodeStore, error) {
singletonLock.Lock()
defer singletonLock.Unlock()

if s, ok := singletons[urlObj.Path]; ok {
return s.ddb, s.vrw, s.ns, nil
}

path, err := url.PathUnescape(urlObj.Path)
if err != nil {
return nil, nil, nil, err
}
type FileDBLoader struct {
urlPath string
path string
useJournal bool
nbf *types.NomsBinFormat
lockFile *fslock.Lock
}

path = filepath.FromSlash(path)
path = urlObj.Host + path
var _ DBLoader = FileDBLoader{}

err = validateDir(path)
if err != nil {
return nil, nil, nil, err
}
func (l FileDBLoader) IsAccessModeReadOnly() bool {
return l.lockFile == nil
}

var useJournal bool
if params != nil {
_, useJournal = params[ChunkJournalParam]
func (l FileDBLoader) LoadDB(ctx context.Context) (datas.Database, types.ValueReadWriter, tree.NodeStore, error) {
singletonLock.Lock()
defer singletonLock.Unlock()
if s, ok := singletons[l.urlPath]; ok {
return s.ddb, s.vrw, s.ns, nil
}

var newGenSt *nbs.NomsBlockStore
var err error // TODO: Need to take both locks
q := nbs.NewUnlimitedMemQuotaProvider()
if useJournal && chunkJournalFeatureFlag {
newGenSt, err = nbs.NewLocalJournalingStore(ctx, nbf.VersionString(), path, q)
if l.useJournal && chunkJournalFeatureFlag {
newGenSt, err = nbs.NewLocalJournalingStoreWithLock(ctx, l.lockFile, l.nbf.VersionString(), l.path, q)
} else {
newGenSt, err = nbs.NewLocalStore(ctx, nbf.VersionString(), path, defaultMemTableSize, q)
newGenSt, err = nbs.NewLocalStoreWithLock(ctx, l.lockFile, l.nbf.VersionString(), l.path, defaultMemTableSize, q)
}

if err != nil {
return nil, nil, nil, err
}

oldgenPath := filepath.Join(path, "oldgen")
oldgenPath := filepath.Join(l.path, "oldgen")
err = validateDir(oldgenPath)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
Expand All @@ -171,7 +168,7 @@ func (fact FileFactory) CreateDB(ctx context.Context, nbf *types.NomsBinFormat,
return nil, nil, nil, err
}

ghostGen, err := nbs.NewGhostBlockStore(path)
ghostGen, err := nbs.NewGhostBlockStore(l.path)
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -183,7 +180,7 @@ func (fact FileFactory) CreateDB(ctx context.Context, nbf *types.NomsBinFormat,
ns := tree.NewNodeStore(st)
ddb := datas.NewTypesDatabase(vrw, ns)

singletons[urlObj.Path] = singletonDB{
singletons[l.urlPath] = singletonDB{
ddb: ddb,
vrw: vrw,
ns: ns,
Expand All @@ -192,6 +189,51 @@ func (fact FileFactory) CreateDB(ctx context.Context, nbf *types.NomsBinFormat,
return ddb, vrw, ns, nil
}

// CreateDB creates a local filesys backed database
func (fact FileFactory) GetDBLoader(ctx context.Context, nbf *types.NomsBinFormat, urlObj *url.URL, params map[string]interface{}) (dbLoader DBLoader, err error) {
singletonLock.Lock()
defer singletonLock.Unlock()

path, err := url.PathUnescape(urlObj.Path)
if err != nil {
return nil, err
}

path = filepath.FromSlash(path)
path = urlObj.Host + path

err = validateDir(path)
if err != nil {
return nil, err
}

var useJournal bool
if params != nil {
_, useJournal = params[ChunkJournalParam]
}

var lock *fslock.Lock

if s, ok := singletonFileLocks[urlObj.Path]; ok {
lock = s
} else {
lock, err = nbs.AcquireManifestLock(path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lifecycle changes seem weird and complicated and potentially wrong for non-journal stores (if those still exist...in that world the file manifest manager only hold the lock while it makes the switch...). Is there an alternative world where we change CreateDB() so that we can request ExeclusiveAccess and if it can't give it to us it can bail early and we can make lazy queryist use that instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we're not actually looking for Exclusive. We're looking for non-ReadOnly. Same thought though...

if err != nil {
return nil, err
}

singletonFileLocks[urlObj.Path] = lock
}

return FileDBLoader{
urlPath: urlObj.Path,
path: path,
useJournal: useJournal,
lockFile: lock,
nbf: nbf,
}, nil
}

func validateDir(path string) error {
info, err := os.Stat(path)

Expand Down
17 changes: 5 additions & 12 deletions go/libraries/doltcore/dbfactory/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/remotestorage"
"github.com/dolthub/dolt/go/libraries/events"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
)

Expand Down Expand Up @@ -70,29 +68,24 @@ func NewDoltRemoteFactory(insecure bool) DoltRemoteFactory {

// CreateDB creates a database backed by a remote server that implements the GRPC rpcs defined by
// remoteapis.ChunkStoreServiceClient
func (fact DoltRemoteFactory) CreateDB(ctx context.Context, nbf *types.NomsBinFormat, urlObj *url.URL, params map[string]interface{}) (datas.Database, types.ValueReadWriter, tree.NodeStore, error) {
var db datas.Database
func (fact DoltRemoteFactory) GetDBLoader(ctx context.Context, nbf *types.NomsBinFormat, urlObj *url.URL, params map[string]interface{}) (DBLoader, error) {

dpi, ok := params[GRPCDialProviderParam]
if dpi == nil || !ok {
return nil, nil, nil, errors.New("DoltRemoteFactory.CreateDB must provide a GRPCDialProvider param through GRPCDialProviderParam")
return nil, errors.New("DoltRemoteFactory.GetDBLoader must provide a GRPCDialProvider param through GRPCDialProviderParam")
}
dp, ok := dpi.(GRPCDialProvider)
if !ok {
return nil, nil, nil, errors.New("DoltRemoteFactory.CreateDB must provide a GRPCDialProvider param through GRPCDialProviderParam")
return nil, errors.New("DoltRemoteFactory.GetDBLoader must provide a GRPCDialProvider param through GRPCDialProviderParam")
}

cs, err := fact.newChunkStore(ctx, nbf, urlObj, params, dp)

if err != nil {
return nil, nil, nil, err
return nil, err
}

vrw := types.NewValueStore(cs)
ns := tree.NewNodeStore(cs)
db = datas.NewTypesDatabase(vrw, ns)

return db, vrw, ns, err
return ChunkStoreLoader{cs: cs}, nil
}

// If |params[NoCachingParameter]| is set in |params| of the CreateDB call for
Expand Down
Loading
Loading