From 18d7ceb847fd2d36a4bff073b9908217dc71a279 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Wed, 12 Feb 2025 13:58:58 -0800 Subject: [PATCH] [no-release-notes] go: sqle: dolt_gc: Purge read caches associated with the DoltDB as we begin GC. The right place to do this is in the safepoint controller, at BeginGC. We need to do it after keeperFunc is installed on the NomsBlockStore, so that all read dependencies are taken. We also need to do it before we register all sessions whose cached state we need to visit. --- go/libraries/doltcore/doltdb/doltdb.go | 9 ++++ .../doltcore/sqle/dprocedures/dolt_gc.go | 7 +++ go/store/prolly/tree/node_cache.go | 15 ++++++ go/store/prolly/tree/node_cache_test.go | 47 +++++++++++++++++++ go/store/prolly/tree/node_store.go | 6 +++ go/store/prolly/tree/testutils.go | 4 ++ go/store/types/value_store.go | 9 ++-- go/store/valuefile/file_value_store.go | 3 ++ 8 files changed, 96 insertions(+), 4 deletions(-) create mode 100644 go/store/prolly/tree/node_cache_test.go diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index e27a397915c..c995185fc3f 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -2191,6 +2191,15 @@ func (ddb *DoltDB) PersistGhostCommits(ctx context.Context, ghostCommits hash.Ha return ddb.db.Database.PersistGhostCommitIDs(ctx, ghostCommits) } +// Purge in-memory read caches associated with this DoltDB. This needs +// to be done at a specific point during a GC operation to ensure that +// everything the application layer sees still exists in the database +// after the GC operation is completed. +func (ddb *DoltDB) PurgeCaches() { + ddb.vrw.PurgeCaches() + ddb.ns.PurgeCaches() +} + type FSCKReport struct { ChunkCount uint32 Problems []error diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go index c2acc6f5f9e..35df17c6ec3 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go @@ -27,6 +27,7 @@ import ( "github.com/dolthub/dolt/go/cmd/dolt/cli" "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/types" @@ -73,10 +74,12 @@ var ErrServerPerformedGC = errors.New("this connection was established when this // invalidated in such a way that all future queries on it return an error. type killConnectionsSafepointController struct { callCtx *sql.Context + doltDB *doltdb.DoltDB origEpoch int } func (sc killConnectionsSafepointController) BeginGC(ctx context.Context, keeper func(hash.Hash) bool) error { + sc.doltDB.PurgeCaches() return nil } @@ -157,6 +160,7 @@ type sessionAwareSafepointController struct { controller *dsess.GCSafepointController callCtx *sql.Context origEpoch int + doltDB *doltdb.DoltDB waiter *dsess.GCSafepointWaiter keeper func(hash.Hash) bool @@ -167,6 +171,7 @@ func (sc *sessionAwareSafepointController) visit(ctx context.Context, sess *dses } func (sc *sessionAwareSafepointController) BeginGC(ctx context.Context, keeper func(hash.Hash) bool) error { + sc.doltDB.PurgeCaches() sc.keeper = keeper thisSess := dsess.DSessFromSess(sc.callCtx.Session) err := sc.visit(ctx, thisSess) @@ -256,11 +261,13 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) { origEpoch: origepoch, callCtx: ctx, controller: gcSafepointController, + doltDB: ddb, } } else { sc = killConnectionsSafepointController{ origEpoch: origepoch, callCtx: ctx, + doltDB: ddb, } } err = ddb.GC(ctx, mode, sc) diff --git a/go/store/prolly/tree/node_cache.go b/go/store/prolly/tree/node_cache.go index 78f6b08eac8..ec264870405 100644 --- a/go/store/prolly/tree/node_cache.go +++ b/go/store/prolly/tree/node_cache.go @@ -48,6 +48,12 @@ func (c nodeCache) insert(addr hash.Hash, node Node) { s.insert(addr, node) } +func (c nodeCache) purge() { + for _, s := range c.stripes { + s.purge() + } +} + type centry struct { a hash.Hash n Node @@ -83,6 +89,15 @@ func removeFromList(e *centry) { e.next = e } +func (s *stripe) purge() { + s.mu.Lock() + defer s.mu.Unlock() + s.chunks = make(map[hash.Hash]*centry) + s.head = nil + s.sz = 0 + s.rev = 0 +} + func (s *stripe) moveToFront(e *centry) { e.i = s.rev s.rev++ diff --git a/go/store/prolly/tree/node_cache_test.go b/go/store/prolly/tree/node_cache_test.go new file mode 100644 index 00000000000..8141751c0d9 --- /dev/null +++ b/go/store/prolly/tree/node_cache_test.go @@ -0,0 +1,47 @@ +// Copyright 2021 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tree + +import ( + "testing" + + "github.com/dolthub/dolt/go/store/hash" + "github.com/stretchr/testify/assert" +) + +func TestNodeCache(t *testing.T) { + t.Run("InsertGetPurge", func(t *testing.T) { + // Simple smoke screen test of insert, get, purge. + var addr hash.Hash + var n Node + n.msg = make([]byte, 1024) + cache := newChunkCache(64 * 1024) + for i := 0; i < 32; i++ { + addr[0] = byte(i) + cache.insert(addr, n) + } + for i := 0; i < 32; i++ { + addr[0] = byte(i) + _, ok := cache.get(addr) + assert.True(t, ok) + } + cache.purge() + for i := 0; i < 32; i++ { + addr[0] = byte(i) + _, ok := cache.get(addr) + assert.False(t, ok) + } + }) +} diff --git a/go/store/prolly/tree/node_store.go b/go/store/prolly/tree/node_store.go index 02c28373e4d..1e8151ad4e4 100644 --- a/go/store/prolly/tree/node_store.go +++ b/go/store/prolly/tree/node_store.go @@ -49,6 +49,8 @@ type NodeStore interface { BlobBuilder() *BlobBuilder PutBlobBuilder(*BlobBuilder) + + PurgeCaches() } type nodeStore struct { @@ -194,3 +196,7 @@ func (ns nodeStore) Format() *types.NomsBinFormat { } return nbf } + +func (ns nodeStore) PurgeCaches() { + ns.cache.purge() +} diff --git a/go/store/prolly/tree/testutils.go b/go/store/prolly/tree/testutils.go index b892cb0323e..7c98dd7d428 100644 --- a/go/store/prolly/tree/testutils.go +++ b/go/store/prolly/tree/testutils.go @@ -333,6 +333,10 @@ func (v nodeStoreValidator) PutBlobBuilder(bb *BlobBuilder) { v.bbp.Put(bb) } +func (v nodeStoreValidator) PurgeCaches() { + v.ns.PurgeCaches() +} + func (v nodeStoreValidator) Format() *types.NomsBinFormat { return v.ns.Format() } diff --git a/go/store/types/value_store.go b/go/store/types/value_store.go index fb348da918f..f22681547cc 100644 --- a/go/store/types/value_store.go +++ b/go/store/types/value_store.go @@ -51,6 +51,7 @@ type ValueReader interface { // package that implements Value writing. type ValueWriter interface { WriteValue(ctx context.Context, v Value) (Ref, error) + PurgeCaches() } // ValueReadWriter is an interface that knows how to read and write Noms @@ -739,10 +740,6 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe return chunks.ErrUnsupportedOperation } - // TODO: The decodedChunks cache can potentially allow phantom reads of - // already collected chunks until we clear it... - lvs.decodedChunks.Purge() - if tfs, ok := lvs.cs.(chunks.TableFileStore); ok { return tfs.PruneTableFiles(ctx) } @@ -809,6 +806,10 @@ func (lvs *ValueStore) gc(ctx context.Context, return finalizer, sweeper.Close(ctx) } +func (lvs *ValueStore) PurgeCaches() { + lvs.decodedChunks.Purge() +} + // Close closes the underlying ChunkStore func (lvs *ValueStore) Close() error { return lvs.cs.Close() diff --git a/go/store/valuefile/file_value_store.go b/go/store/valuefile/file_value_store.go index a6b868b36f8..cb81fc053f1 100644 --- a/go/store/valuefile/file_value_store.go +++ b/go/store/valuefile/file_value_store.go @@ -161,6 +161,9 @@ func (f *FileValueStore) CacheHas(h hash.Hash) bool { return ok } +func (f *FileValueStore) PurgeCaches() { +} + // HasMany returns the set of hashes that are absent from the store func (f *FileValueStore) HasMany(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error) { f.chunkLock.Lock()