diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index e27a397915c..c995185fc3f 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -2191,6 +2191,15 @@ func (ddb *DoltDB) PersistGhostCommits(ctx context.Context, ghostCommits hash.Ha return ddb.db.Database.PersistGhostCommitIDs(ctx, ghostCommits) } +// Purge in-memory read caches associated with this DoltDB. This needs +// to be done at a specific point during a GC operation to ensure that +// everything the application layer sees still exists in the database +// after the GC operation is completed. +func (ddb *DoltDB) PurgeCaches() { + ddb.vrw.PurgeCaches() + ddb.ns.PurgeCaches() +} + type FSCKReport struct { ChunkCount uint32 Problems []error diff --git a/go/libraries/doltcore/doltdb/gc_test.go b/go/libraries/doltcore/doltdb/gc_test.go index d5bfc425617..377bf51bc07 100644 --- a/go/libraries/doltcore/doltdb/gc_test.go +++ b/go/libraries/doltcore/doltdb/gc_test.go @@ -139,7 +139,8 @@ func testGarbageCollection(t *testing.T, test gcTest) { } } - err := dEnv.DoltDB(ctx).GC(ctx, types.GCModeDefault, nil) + ddb := dEnv.DoltDB(ctx) + err := ddb.GC(ctx, types.GCModeDefault, purgingSafepointController{ddb}) require.NoError(t, err) test.postGCFunc(ctx, t, dEnv.DoltDB(ctx), res) @@ -208,7 +209,7 @@ func testGarbageCollectionHasCacheDataCorruptionBugFix(t *testing.T) { _, err = ns.Write(ctx, c1.Node()) require.NoError(t, err) - err = ddb.GC(ctx, types.GCModeDefault, nil) + err = ddb.GC(ctx, types.GCModeDefault, purgingSafepointController{ddb}) require.NoError(t, err) c2 := newIntMap(t, ctx, ns, 2, 2) @@ -265,3 +266,25 @@ func newAddrMap(t *testing.T, ctx context.Context, ns tree.NodeStore, key string return m } + +type purgingSafepointController struct { + ddb *doltdb.DoltDB +} + +var _ (types.GCSafepointController) = purgingSafepointController{} + +func (c purgingSafepointController) BeginGC(ctx context.Context, keeper func(h hash.Hash) bool) error { + c.ddb.PurgeCaches() + return nil +} + +func (c purgingSafepointController) EstablishPreFinalizeSafepoint(context.Context) error { + return nil +} + +func (c purgingSafepointController) EstablishPostFinalizeSafepoint(context.Context) error { + return nil +} + +func (c purgingSafepointController) CancelSafepoint() { +} diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go index c2acc6f5f9e..7d08abc1f03 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go @@ -27,6 +27,7 @@ import ( "github.com/dolthub/dolt/go/cmd/dolt/cli" "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/types" @@ -73,10 +74,12 @@ var ErrServerPerformedGC = errors.New("this connection was established when this // invalidated in such a way that all future queries on it return an error. type killConnectionsSafepointController struct { callCtx *sql.Context + doltDB *doltdb.DoltDB origEpoch int } func (sc killConnectionsSafepointController) BeginGC(ctx context.Context, keeper func(hash.Hash) bool) error { + sc.doltDB.PurgeCaches() return nil } @@ -127,23 +130,24 @@ func (sc killConnectionsSafepointController) EstablishPostFinalizeSafepoint(ctx params := backoff.NewExponentialBackOff() params.InitialInterval = 1 * time.Millisecond params.MaxInterval = 25 * time.Millisecond - params.MaxElapsedTime = 3 * time.Second + params.MaxElapsedTime = 10 * time.Second + var unkilled map[uint32]struct{} err = backoff.Retry(func() error { + unkilled = make(map[uint32]struct{}) processes := sc.callCtx.ProcessList.Processes() - allgood := true for _, p := range processes { if _, ok := killed[p.Connection]; ok { - allgood = false + unkilled[p.Connection] = struct{}{} sc.callCtx.ProcessList.Kill(p.Connection) } } - if !allgood { - return errors.New("unable to establish safepoint.") + if len(unkilled) > 0 { + return errors.New("could not establish safepont") } return nil }, params) if err != nil { - return err + return fmt.Errorf("%w: still saw these connections in the process list: %v", err, unkilled) } sc.callCtx.Session.SetTransaction(nil) dsess.DSessFromSess(sc.callCtx.Session).SetValidateErr(ErrServerPerformedGC) @@ -157,6 +161,7 @@ type sessionAwareSafepointController struct { controller *dsess.GCSafepointController callCtx *sql.Context origEpoch int + doltDB *doltdb.DoltDB waiter *dsess.GCSafepointWaiter keeper func(hash.Hash) bool @@ -167,6 +172,7 @@ func (sc *sessionAwareSafepointController) visit(ctx context.Context, sess *dses } func (sc *sessionAwareSafepointController) BeginGC(ctx context.Context, keeper func(hash.Hash) bool) error { + sc.doltDB.PurgeCaches() sc.keeper = keeper thisSess := dsess.DSessFromSess(sc.callCtx.Session) err := sc.visit(ctx, thisSess) @@ -256,11 +262,13 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) { origEpoch: origepoch, callCtx: ctx, controller: gcSafepointController, + doltDB: ddb, } } else { sc = killConnectionsSafepointController{ origEpoch: origepoch, callCtx: ctx, + doltDB: ddb, } } err = ddb.GC(ctx, mode, sc) diff --git a/go/store/nbs/gc_copier.go b/go/store/nbs/gc_copier.go index 44c02df8ef7..77d9c46b12b 100644 --- a/go/store/nbs/gc_copier.go +++ b/go/store/nbs/gc_copier.go @@ -73,14 +73,14 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context) (ts []tableSpec, err e return nil, err } + defer func() { + gcc.writer.Cancel() + }() + if gcc.writer.ChunkCount() == 0 { return []tableSpec{}, nil } - defer func() { - _ = gcc.writer.Remove() - }() - addr, ok := hash.MaybeParse(filename) if !ok { return nil, fmt.Errorf("invalid filename: %s", filename) diff --git a/go/store/nbs/journal_chunk_source.go b/go/store/nbs/journal_chunk_source.go index 10a5548b0c8..da977441ced 100644 --- a/go/store/nbs/journal_chunk_source.go +++ b/go/store/nbs/journal_chunk_source.go @@ -145,12 +145,19 @@ func (s journalChunkSource) getManyCompressed(ctx context.Context, eg *errgroup. return jReqs[i].r.Offset < jReqs[j].r.Offset }) + wg.Add(len(jReqs)) + go func() { + wg.Wait() + s.journal.lock.RUnlock() + }() for i := range jReqs { // workers populate the parent error group // record local workers for releasing lock - wg.Add(1) eg.Go(func() error { defer wg.Done() + if ctx.Err() != nil { + return ctx.Err() + } rec := jReqs[i] a := reqs[rec.idx].a if cc, err := s.journal.getCompressedChunkAtRange(rec.r, *a); err != nil { @@ -163,10 +170,6 @@ func (s journalChunkSource) getManyCompressed(ctx context.Context, eg *errgroup. } }) } - go func() { - wg.Wait() - s.journal.lock.RUnlock() - }() return remaining, gcBehavior_Continue, nil } diff --git a/go/store/nbs/store_test.go b/go/store/nbs/store_test.go index f68b23af2ac..978ad5c77d1 100644 --- a/go/store/nbs/store_test.go +++ b/go/store/nbs/store_test.go @@ -23,6 +23,7 @@ import ( "math/rand" "os" "path/filepath" + "sync" "testing" "time" @@ -656,3 +657,28 @@ func TestGuessPrefixOrdinal(t *testing.T) { assert.Equal(t, i, guess) } } + +func TestWaitForGC(t *testing.T) { + // Wait for GC should always return when the context is canceled... + nbs := &NomsBlockStore{} + nbs.cond = sync.NewCond(&nbs.mu) + nbs.gcInProgress = true + const numThreads = 32 + cancels := make([]func(), 0, numThreads) + var wg sync.WaitGroup + wg.Add(numThreads) + for i := 0; i < numThreads; i++ { + ctx, cancel := context.WithCancel(context.Background()) + cancels = append(cancels, cancel) + go func() { + defer wg.Done() + nbs.mu.Lock() + defer nbs.mu.Unlock() + nbs.waitForGC(ctx) + }() + } + for _, c := range cancels { + c() + } + wg.Wait() +} diff --git a/go/store/prolly/tree/node_cache.go b/go/store/prolly/tree/node_cache.go index 78f6b08eac8..ec264870405 100644 --- a/go/store/prolly/tree/node_cache.go +++ b/go/store/prolly/tree/node_cache.go @@ -48,6 +48,12 @@ func (c nodeCache) insert(addr hash.Hash, node Node) { s.insert(addr, node) } +func (c nodeCache) purge() { + for _, s := range c.stripes { + s.purge() + } +} + type centry struct { a hash.Hash n Node @@ -83,6 +89,15 @@ func removeFromList(e *centry) { e.next = e } +func (s *stripe) purge() { + s.mu.Lock() + defer s.mu.Unlock() + s.chunks = make(map[hash.Hash]*centry) + s.head = nil + s.sz = 0 + s.rev = 0 +} + func (s *stripe) moveToFront(e *centry) { e.i = s.rev s.rev++ diff --git a/go/store/prolly/tree/node_cache_test.go b/go/store/prolly/tree/node_cache_test.go new file mode 100644 index 00000000000..5a0aabace40 --- /dev/null +++ b/go/store/prolly/tree/node_cache_test.go @@ -0,0 +1,48 @@ +// Copyright 2021 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tree + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/dolthub/dolt/go/store/hash" +) + +func TestNodeCache(t *testing.T) { + t.Run("InsertGetPurge", func(t *testing.T) { + // Simple smoke screen test of insert, get, purge. + var addr hash.Hash + var n Node + n.msg = make([]byte, 1024) + cache := newChunkCache(64 * 1024) + for i := 0; i < 32; i++ { + addr[0] = byte(i) + cache.insert(addr, n) + } + for i := 0; i < 32; i++ { + addr[0] = byte(i) + _, ok := cache.get(addr) + assert.True(t, ok) + } + cache.purge() + for i := 0; i < 32; i++ { + addr[0] = byte(i) + _, ok := cache.get(addr) + assert.False(t, ok) + } + }) +} diff --git a/go/store/prolly/tree/node_store.go b/go/store/prolly/tree/node_store.go index 02c28373e4d..8733393e9a0 100644 --- a/go/store/prolly/tree/node_store.go +++ b/go/store/prolly/tree/node_store.go @@ -49,6 +49,12 @@ type NodeStore interface { BlobBuilder() *BlobBuilder PutBlobBuilder(*BlobBuilder) + + // Delete any cached chunks associated with this NodeStore. + // Used by GC during safepoint establishment to ensure deleted + // chunks do not float around in the application layer after GC + // completes. + PurgeCaches() } type nodeStore struct { @@ -194,3 +200,7 @@ func (ns nodeStore) Format() *types.NomsBinFormat { } return nbf } + +func (ns nodeStore) PurgeCaches() { + ns.cache.purge() +} diff --git a/go/store/prolly/tree/testutils.go b/go/store/prolly/tree/testutils.go index b892cb0323e..7c98dd7d428 100644 --- a/go/store/prolly/tree/testutils.go +++ b/go/store/prolly/tree/testutils.go @@ -333,6 +333,10 @@ func (v nodeStoreValidator) PutBlobBuilder(bb *BlobBuilder) { v.bbp.Put(bb) } +func (v nodeStoreValidator) PurgeCaches() { + v.ns.PurgeCaches() +} + func (v nodeStoreValidator) Format() *types.NomsBinFormat { return v.ns.Format() } diff --git a/go/store/types/value_store.go b/go/store/types/value_store.go index fb348da918f..f22681547cc 100644 --- a/go/store/types/value_store.go +++ b/go/store/types/value_store.go @@ -51,6 +51,7 @@ type ValueReader interface { // package that implements Value writing. type ValueWriter interface { WriteValue(ctx context.Context, v Value) (Ref, error) + PurgeCaches() } // ValueReadWriter is an interface that knows how to read and write Noms @@ -739,10 +740,6 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe return chunks.ErrUnsupportedOperation } - // TODO: The decodedChunks cache can potentially allow phantom reads of - // already collected chunks until we clear it... - lvs.decodedChunks.Purge() - if tfs, ok := lvs.cs.(chunks.TableFileStore); ok { return tfs.PruneTableFiles(ctx) } @@ -809,6 +806,10 @@ func (lvs *ValueStore) gc(ctx context.Context, return finalizer, sweeper.Close(ctx) } +func (lvs *ValueStore) PurgeCaches() { + lvs.decodedChunks.Purge() +} + // Close closes the underlying ChunkStore func (lvs *ValueStore) Close() error { return lvs.cs.Close() diff --git a/go/store/types/value_store_test.go b/go/store/types/value_store_test.go index d797671c04c..82a54b5c61e 100644 --- a/go/store/types/value_store_test.go +++ b/go/store/types/value_store_test.go @@ -198,7 +198,7 @@ func TestGC(t *testing.T) { require.NoError(t, err) assert.NotNil(v2) - err = vs.GC(ctx, GCModeDefault, hash.HashSet{}, hash.HashSet{}, nil) + err = vs.GC(ctx, GCModeDefault, hash.HashSet{}, hash.HashSet{}, purgingSafepointController{vs}) require.NoError(t, err) v1, err = vs.ReadValue(ctx, h1) // non-nil @@ -216,3 +216,25 @@ type badVersionStore struct { func (b *badVersionStore) Version() string { return "BAD" } + +type purgingSafepointController struct { + vs *ValueStore +} + +var _ (GCSafepointController) = purgingSafepointController{} + +func (c purgingSafepointController) BeginGC(ctx context.Context, keeper func(h hash.Hash) bool) error { + c.vs.PurgeCaches() + return nil +} + +func (c purgingSafepointController) EstablishPreFinalizeSafepoint(context.Context) error { + return nil +} + +func (c purgingSafepointController) EstablishPostFinalizeSafepoint(context.Context) error { + return nil +} + +func (c purgingSafepointController) CancelSafepoint() { +} diff --git a/go/store/valuefile/file_value_store.go b/go/store/valuefile/file_value_store.go index a6b868b36f8..cb81fc053f1 100644 --- a/go/store/valuefile/file_value_store.go +++ b/go/store/valuefile/file_value_store.go @@ -161,6 +161,9 @@ func (f *FileValueStore) CacheHas(h hash.Hash) bool { return ok } +func (f *FileValueStore) PurgeCaches() { +} + // HasMany returns the set of hashes that are absent from the store func (f *FileValueStore) HasMany(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error) { f.chunkLock.Lock() diff --git a/integration-tests/go-sql-server-driver/concurrent_gc_test.go b/integration-tests/go-sql-server-driver/concurrent_gc_test.go index 9babe141850..f7c1094975c 100644 --- a/integration-tests/go-sql-server-driver/concurrent_gc_test.go +++ b/integration-tests/go-sql-server-driver/concurrent_gc_test.go @@ -116,6 +116,7 @@ func (gct gcTest) doUpdate(t *testing.T, ctx context.Context, db *sql.DB, i int) return nil } } else if err != nil { + require.NotContains(t, err.Error(), "connection refused") t.Logf("err in Conn: %v", err) return nil } @@ -162,6 +163,7 @@ func (gct gcTest) doGC(t *testing.T, ctx context.Context, db *sql.DB) error { return nil } } else if err != nil { + require.NotContains(t, err.Error(), "connection refused") t.Logf("err in Conn for dolt_gc: %v", err) return nil }