Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[no-release-notes] go: sqle: dolt_gc: Purge read caches associated with the DoltDB as we begin GC. #8858

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions go/libraries/doltcore/doltdb/doltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2191,6 +2191,15 @@ func (ddb *DoltDB) PersistGhostCommits(ctx context.Context, ghostCommits hash.Ha
return ddb.db.Database.PersistGhostCommitIDs(ctx, ghostCommits)
}

// Purge in-memory read caches associated with this DoltDB. This needs
// to be done at a specific point during a GC operation to ensure that
// everything the application layer sees still exists in the database
// after the GC operation is completed.
func (ddb *DoltDB) PurgeCaches() {
ddb.vrw.PurgeCaches()
ddb.ns.PurgeCaches()
}

type FSCKReport struct {
ChunkCount uint32
Problems []error
Expand Down
7 changes: 7 additions & 0 deletions go/libraries/doltcore/sqle/dprocedures/dolt_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/libraries/doltcore/branch_control"
"github.com/dolthub/dolt/go/libraries/doltcore/dconfig"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/types"
Expand Down Expand Up @@ -73,10 +74,12 @@ var ErrServerPerformedGC = errors.New("this connection was established when this
// invalidated in such a way that all future queries on it return an error.
type killConnectionsSafepointController struct {
callCtx *sql.Context
doltDB *doltdb.DoltDB
origEpoch int
}

func (sc killConnectionsSafepointController) BeginGC(ctx context.Context, keeper func(hash.Hash) bool) error {
sc.doltDB.PurgeCaches()
return nil
}

Expand Down Expand Up @@ -157,6 +160,7 @@ type sessionAwareSafepointController struct {
controller *dsess.GCSafepointController
callCtx *sql.Context
origEpoch int
doltDB *doltdb.DoltDB

waiter *dsess.GCSafepointWaiter
keeper func(hash.Hash) bool
Expand All @@ -167,6 +171,7 @@ func (sc *sessionAwareSafepointController) visit(ctx context.Context, sess *dses
}

func (sc *sessionAwareSafepointController) BeginGC(ctx context.Context, keeper func(hash.Hash) bool) error {
sc.doltDB.PurgeCaches()
sc.keeper = keeper
thisSess := dsess.DSessFromSess(sc.callCtx.Session)
err := sc.visit(ctx, thisSess)
Expand Down Expand Up @@ -256,11 +261,13 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) {
origEpoch: origepoch,
callCtx: ctx,
controller: gcSafepointController,
doltDB: ddb,
}
} else {
sc = killConnectionsSafepointController{
origEpoch: origepoch,
callCtx: ctx,
doltDB: ddb,
}
}
err = ddb.GC(ctx, mode, sc)
Expand Down
15 changes: 15 additions & 0 deletions go/store/prolly/tree/node_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ func (c nodeCache) insert(addr hash.Hash, node Node) {
s.insert(addr, node)
}

func (c nodeCache) purge() {
for _, s := range c.stripes {
s.purge()
}
}

type centry struct {
a hash.Hash
n Node
Expand Down Expand Up @@ -83,6 +89,15 @@ func removeFromList(e *centry) {
e.next = e
}

func (s *stripe) purge() {
s.mu.Lock()
defer s.mu.Unlock()
s.chunks = make(map[hash.Hash]*centry)
s.head = nil
s.sz = 0
s.rev = 0
}

func (s *stripe) moveToFront(e *centry) {
e.i = s.rev
s.rev++
Expand Down
48 changes: 48 additions & 0 deletions go/store/prolly/tree/node_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tree

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/dolthub/dolt/go/store/hash"
)

func TestNodeCache(t *testing.T) {
t.Run("InsertGetPurge", func(t *testing.T) {
// Simple smoke screen test of insert, get, purge.
var addr hash.Hash
var n Node
n.msg = make([]byte, 1024)
cache := newChunkCache(64 * 1024)
for i := 0; i < 32; i++ {
addr[0] = byte(i)
cache.insert(addr, n)
}
for i := 0; i < 32; i++ {
addr[0] = byte(i)
_, ok := cache.get(addr)
assert.True(t, ok)
}
cache.purge()
for i := 0; i < 32; i++ {
addr[0] = byte(i)
_, ok := cache.get(addr)
assert.False(t, ok)
}
})
}
6 changes: 6 additions & 0 deletions go/store/prolly/tree/node_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type NodeStore interface {

BlobBuilder() *BlobBuilder
PutBlobBuilder(*BlobBuilder)

PurgeCaches()
}

type nodeStore struct {
Expand Down Expand Up @@ -194,3 +196,7 @@ func (ns nodeStore) Format() *types.NomsBinFormat {
}
return nbf
}

func (ns nodeStore) PurgeCaches() {
ns.cache.purge()
}
4 changes: 4 additions & 0 deletions go/store/prolly/tree/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ func (v nodeStoreValidator) PutBlobBuilder(bb *BlobBuilder) {
v.bbp.Put(bb)
}

func (v nodeStoreValidator) PurgeCaches() {
v.ns.PurgeCaches()
}

func (v nodeStoreValidator) Format() *types.NomsBinFormat {
return v.ns.Format()
}
9 changes: 5 additions & 4 deletions go/store/types/value_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type ValueReader interface {
// package that implements Value writing.
type ValueWriter interface {
WriteValue(ctx context.Context, v Value) (Ref, error)
PurgeCaches()
}

// ValueReadWriter is an interface that knows how to read and write Noms
Expand Down Expand Up @@ -739,10 +740,6 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
return chunks.ErrUnsupportedOperation
}

// TODO: The decodedChunks cache can potentially allow phantom reads of
// already collected chunks until we clear it...
lvs.decodedChunks.Purge()

if tfs, ok := lvs.cs.(chunks.TableFileStore); ok {
return tfs.PruneTableFiles(ctx)
}
Expand Down Expand Up @@ -809,6 +806,10 @@ func (lvs *ValueStore) gc(ctx context.Context,
return finalizer, sweeper.Close(ctx)
}

func (lvs *ValueStore) PurgeCaches() {
lvs.decodedChunks.Purge()
}

// Close closes the underlying ChunkStore
func (lvs *ValueStore) Close() error {
return lvs.cs.Close()
Expand Down
3 changes: 3 additions & 0 deletions go/store/valuefile/file_value_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ func (f *FileValueStore) CacheHas(h hash.Hash) bool {
return ok
}

func (f *FileValueStore) PurgeCaches() {
}

// HasMany returns the set of hashes that are absent from the store
func (f *FileValueStore) HasMany(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error) {
f.chunkLock.Lock()
Expand Down
Loading