Skip to content

Commit

Permalink
[no-release-notes] go: sqle: dolt_gc: Purge read caches associated wi…
Browse files Browse the repository at this point in the history
…th the DoltDB as we begin GC.

The right place to do this is in the safepoint controller, at BeginGC. We need
to do it after keeperFunc is installed on the NomsBlockStore, so that all read
dependencies are taken. We also need to do it before we register all sessions
whose cached state we need to visit.
  • Loading branch information
reltuk committed Feb 12, 2025
1 parent 846b67e commit 18d7ceb
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 4 deletions.
9 changes: 9 additions & 0 deletions go/libraries/doltcore/doltdb/doltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2191,6 +2191,15 @@ func (ddb *DoltDB) PersistGhostCommits(ctx context.Context, ghostCommits hash.Ha
return ddb.db.Database.PersistGhostCommitIDs(ctx, ghostCommits)
}

// Purge in-memory read caches associated with this DoltDB. This needs
// to be done at a specific point during a GC operation to ensure that
// everything the application layer sees still exists in the database
// after the GC operation is completed.
func (ddb *DoltDB) PurgeCaches() {
ddb.vrw.PurgeCaches()
ddb.ns.PurgeCaches()
}

type FSCKReport struct {
ChunkCount uint32
Problems []error
Expand Down
7 changes: 7 additions & 0 deletions go/libraries/doltcore/sqle/dprocedures/dolt_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/libraries/doltcore/branch_control"
"github.com/dolthub/dolt/go/libraries/doltcore/dconfig"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/types"
Expand Down Expand Up @@ -73,10 +74,12 @@ var ErrServerPerformedGC = errors.New("this connection was established when this
// invalidated in such a way that all future queries on it return an error.
type killConnectionsSafepointController struct {
callCtx *sql.Context
doltDB *doltdb.DoltDB
origEpoch int
}

func (sc killConnectionsSafepointController) BeginGC(ctx context.Context, keeper func(hash.Hash) bool) error {
sc.doltDB.PurgeCaches()
return nil
}

Expand Down Expand Up @@ -157,6 +160,7 @@ type sessionAwareSafepointController struct {
controller *dsess.GCSafepointController
callCtx *sql.Context
origEpoch int
doltDB *doltdb.DoltDB

waiter *dsess.GCSafepointWaiter
keeper func(hash.Hash) bool
Expand All @@ -167,6 +171,7 @@ func (sc *sessionAwareSafepointController) visit(ctx context.Context, sess *dses
}

func (sc *sessionAwareSafepointController) BeginGC(ctx context.Context, keeper func(hash.Hash) bool) error {
sc.doltDB.PurgeCaches()
sc.keeper = keeper
thisSess := dsess.DSessFromSess(sc.callCtx.Session)
err := sc.visit(ctx, thisSess)
Expand Down Expand Up @@ -256,11 +261,13 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) {
origEpoch: origepoch,
callCtx: ctx,
controller: gcSafepointController,
doltDB: ddb,
}
} else {
sc = killConnectionsSafepointController{
origEpoch: origepoch,
callCtx: ctx,
doltDB: ddb,
}
}
err = ddb.GC(ctx, mode, sc)
Expand Down
15 changes: 15 additions & 0 deletions go/store/prolly/tree/node_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ func (c nodeCache) insert(addr hash.Hash, node Node) {
s.insert(addr, node)
}

func (c nodeCache) purge() {
for _, s := range c.stripes {
s.purge()
}
}

type centry struct {
a hash.Hash
n Node
Expand Down Expand Up @@ -83,6 +89,15 @@ func removeFromList(e *centry) {
e.next = e
}

func (s *stripe) purge() {
s.mu.Lock()
defer s.mu.Unlock()
s.chunks = make(map[hash.Hash]*centry)
s.head = nil
s.sz = 0
s.rev = 0
}

func (s *stripe) moveToFront(e *centry) {
e.i = s.rev
s.rev++
Expand Down
47 changes: 47 additions & 0 deletions go/store/prolly/tree/node_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tree

import (
"testing"

"github.com/dolthub/dolt/go/store/hash"
"github.com/stretchr/testify/assert"
)

func TestNodeCache(t *testing.T) {
t.Run("InsertGetPurge", func(t *testing.T) {
// Simple smoke screen test of insert, get, purge.
var addr hash.Hash
var n Node
n.msg = make([]byte, 1024)
cache := newChunkCache(64 * 1024)
for i := 0; i < 32; i++ {
addr[0] = byte(i)
cache.insert(addr, n)
}
for i := 0; i < 32; i++ {
addr[0] = byte(i)
_, ok := cache.get(addr)
assert.True(t, ok)
}
cache.purge()
for i := 0; i < 32; i++ {
addr[0] = byte(i)
_, ok := cache.get(addr)
assert.False(t, ok)
}
})
}
6 changes: 6 additions & 0 deletions go/store/prolly/tree/node_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type NodeStore interface {

BlobBuilder() *BlobBuilder
PutBlobBuilder(*BlobBuilder)

PurgeCaches()
}

type nodeStore struct {
Expand Down Expand Up @@ -194,3 +196,7 @@ func (ns nodeStore) Format() *types.NomsBinFormat {
}
return nbf
}

func (ns nodeStore) PurgeCaches() {
ns.cache.purge()
}
4 changes: 4 additions & 0 deletions go/store/prolly/tree/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ func (v nodeStoreValidator) PutBlobBuilder(bb *BlobBuilder) {
v.bbp.Put(bb)
}

func (v nodeStoreValidator) PurgeCaches() {
v.ns.PurgeCaches()
}

func (v nodeStoreValidator) Format() *types.NomsBinFormat {
return v.ns.Format()
}
9 changes: 5 additions & 4 deletions go/store/types/value_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type ValueReader interface {
// package that implements Value writing.
type ValueWriter interface {
WriteValue(ctx context.Context, v Value) (Ref, error)
PurgeCaches()
}

// ValueReadWriter is an interface that knows how to read and write Noms
Expand Down Expand Up @@ -739,10 +740,6 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
return chunks.ErrUnsupportedOperation
}

// TODO: The decodedChunks cache can potentially allow phantom reads of
// already collected chunks until we clear it...
lvs.decodedChunks.Purge()

if tfs, ok := lvs.cs.(chunks.TableFileStore); ok {
return tfs.PruneTableFiles(ctx)
}
Expand Down Expand Up @@ -809,6 +806,10 @@ func (lvs *ValueStore) gc(ctx context.Context,
return finalizer, sweeper.Close(ctx)
}

func (lvs *ValueStore) PurgeCaches() {
lvs.decodedChunks.Purge()
}

// Close closes the underlying ChunkStore
func (lvs *ValueStore) Close() error {
return lvs.cs.Close()
Expand Down
3 changes: 3 additions & 0 deletions go/store/valuefile/file_value_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ func (f *FileValueStore) CacheHas(h hash.Hash) bool {
return ok
}

func (f *FileValueStore) PurgeCaches() {
}

// HasMany returns the set of hashes that are absent from the store
func (f *FileValueStore) HasMany(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error) {
f.chunkLock.Lock()
Expand Down

0 comments on commit 18d7ceb

Please sign in to comment.