diff --git a/mutable_tree.go b/mutable_tree.go index 9ff5a3c10..939197454 100644 --- a/mutable_tree.go +++ b/mutable_tree.go @@ -1030,7 +1030,7 @@ func (tree *MutableTree) balance(node *Node) (newSelf *Node, err error) { } // saveNewNodes save new created nodes by the changes of the working tree. -// NOTE: This function clears leftNode/rigthNode recursively and +// NOTE: This function clears leftNode/rightNode recursively and // calls _hash() on the given node. func (tree *MutableTree) saveNewNodes(version int64) error { nonce := uint32(0) diff --git a/nodedb.go b/nodedb.go index 6e9b1e2a4..4c6a49755 100644 --- a/nodedb.go +++ b/nodedb.go @@ -25,51 +25,61 @@ const ( int32Size = 4 int64Size = 8 hashSize = sha256.Size - genesisVersion = 1 storageVersionKey = "storage_version" - // We store latest saved version together with storage version delimited by the constant below. - // This delimiter is valid only if fast storage is enabled (i.e. storageVersion >= fastStorageVersionValue). + // We store the latest saved version together with storage version delimited by the constant below. + // This delimiter is valid only if fast storage is enabled (i.e., storageVersion >= fastStorageVersionValue). // The latest saved version is needed for protection against downgrade and re-upgrade. In such a case, it would - // be possible to observe mismatch between the latest version state and the fast nodes on disk. + // be possible to observe a mismatch between the latest version state and the fast nodes on disk. // Therefore, we would like to detect that and overwrite fast nodes on disk with the latest version state. fastStorageVersionDelimiter = "-" // Using semantic versioning: https://semver.org/ defaultStorageVersionValue = "1.0.0" fastStorageVersionValue = "1.1.0" fastNodeCacheSize = 100000 + + nodeKeyPrefix = 's' + fastKeyPrefix = 'f' + metadataKeyPrefix = 'm' + legacyNodeKeyPrefix = 'n' + legacyOrphanKeyPrefix = 'o' + legacyRootKeyPrefix = 'r' ) var ( // All new node keys are prefixed with the byte 's'. This ensures no collision is - // possible with the legacy nodes, and makes them easier to traverse. They are indexed by the version and the local nonce. - nodeKeyFormat = keyformat.NewFastPrefixFormatter('s', int64Size+int32Size) // s + // possible with the legacy nodes and makes them easier to traverse. They are indexed by the version and the local nonce. + nodeKeyFormat = keyformat.NewFastPrefixFormatter(nodeKeyPrefix, int64Size+int32Size) // s - // This is only used for the iteration purpose. - nodeKeyPrefixFormat = keyformat.NewFastPrefixFormatter('s', int64Size) // s + // This is only used for the iteration. + nodeKeyPrefixFormat = keyformat.NewFastPrefixFormatter(nodeKeyPrefix, int64Size) // s // Key Format for making reads and iterates go through a data-locality preserving db. // The value at an entry will list what version it was written to. - // Then to query values, you first query state via this fast method. - // If its present, then check the tree version. If tree version >= result_version, - // return result_version. Else, go through old (slow) IAVL get method that walks through tree. - fastKeyFormat = keyformat.NewKeyFormat('f', 0) // f + // Then, to query values, you first query state via this fast method. + // If present, then check the tree version. If tree version >= result_version, + // return result_version. Else, go through old (slow) IAVL get method that walks through the tree. + fastKeyFormat = keyformat.NewKeyFormat(fastKeyPrefix, 0) // f // Key Format for storing metadata about the chain such as the version number. // The value at an entry will be in a variable format and up to the caller to // decide how to parse. - metadataKeyFormat = keyformat.NewKeyFormat('m', 0) // m + metadataKeyFormat = keyformat.NewKeyFormat(metadataKeyPrefix, 0) // m // All legacy node keys are prefixed with the byte 'n'. - legacyNodeKeyFormat = keyformat.NewFastPrefixFormatter('n', hashSize) // n + legacyNodeKeyFormat = keyformat.NewFastPrefixFormatter(legacyNodeKeyPrefix, hashSize) // n // All legacy orphan keys are prefixed with the byte 'o'. - legacyOrphanKeyFormat = keyformat.NewKeyFormat('o', int64Size, int64Size, hashSize) // o + legacyOrphanKeyFormat = keyformat.NewKeyFormat(legacyOrphanKeyPrefix, int64Size, int64Size, hashSize) // o // All legacy root keys are prefixed with the byte 'r'. - legacyRootKeyFormat = keyformat.NewKeyFormat('r', int64Size) // r + legacyRootKeyFormat = keyformat.NewKeyFormat(legacyRootKeyPrefix, int64Size) // r ) -var errInvalidFastStorageVersion = fmt.Errorf("fast storage version must be in the format %s", fastStorageVersionDelimiter) +var ( + ErrNodeMissingNodeKey = errors.New("node does not have a nodeKey") + + errInvalidFastStorageVersion = fmt.Errorf("fast storage version must be in the format %s", fastStorageVersionDelimiter) +) type nodeDB struct { ctx context.Context @@ -148,9 +158,9 @@ func (ndb *nodeDB) GetNode(nk []byte) (*Node, error) { ndb.opts.Stat.IncCacheMissCnt() // Doesn't exist, load. - isLegcyNode := len(nk) == hashSize + isLegacyNode := len(nk) == hashSize var nodeKey []byte - if isLegcyNode { + if isLegacyNode { nodeKey = ndb.legacyNodeKey(nk) } else { nodeKey = ndb.nodeKey(nk) @@ -159,8 +169,8 @@ func (ndb *nodeDB) GetNode(nk []byte) (*Node, error) { if err != nil { return nil, fmt.Errorf("can't get node %v: %v", nk, err) } - if buf == nil && !isLegcyNode { - // if the node is reformatted by pruning, check against (version, 0) + if buf == nil && !isLegacyNode { + // if pruning reformats the node, check against (version, 0) nKey := GetNodeKey(nk) if nKey.nonce == 1 { nodeKey = ndb.nodeKey((&NodeKey{ @@ -174,11 +184,11 @@ func (ndb *nodeDB) GetNode(nk []byte) (*Node, error) { } } if buf == nil { - return nil, fmt.Errorf("Value missing for key %v corresponding to nodeKey %x", nk, nodeKey) + return nil, fmt.Errorf("value missing for key %v corresponding to nodeKey %x", nk, nodeKey) } var node *Node - if isLegcyNode { + if isLegacyNode { node, err = MakeLegacyNode(nk, buf) if err != nil { return nil, fmt.Errorf("error reading Legacy Node. bytes: %x, error: %v", buf, err) @@ -416,15 +426,15 @@ func (ndb *nodeDB) saveNodeFromPruning(node *Node) error { return ndb.batch.Set(ndb.nodeKey(node.GetKey()), buf.Bytes()) } -// rootkey cache of two elements, attempting to mimic a direct-mapped cache. -type rootkeyCache struct { - // initial value is set to {-1, -1}, which is an invalid version for a getrootkey call. +// rootKeyCache cache of two elements, attempting to mimic a direct-mapped cache. +type rootKeyCache struct { + // the initial value is set to {-1, -1}, which is an invalid version for a getRootKey call. versions [2]int64 rootKeys [2][]byte next int } -func (rkc *rootkeyCache) getRootKey(ndb *nodeDB, version int64) ([]byte, error) { +func (rkc *rootKeyCache) getRootKey(ndb *nodeDB, version int64) ([]byte, error) { // Check both cache entries for i := 0; i < 2; i++ { if rkc.versions[i] == version { @@ -440,15 +450,15 @@ func (rkc *rootkeyCache) getRootKey(ndb *nodeDB, version int64) ([]byte, error) return rootKey, nil } -func (rkc *rootkeyCache) setRootKey(version int64, rootKey []byte) { - // Store in next available slot, cycling between 0 and 1 +func (rkc *rootKeyCache) setRootKey(version int64, rootKey []byte) { + // Store in the next available slot, cycling between 0 and 1 rkc.versions[rkc.next] = version rkc.rootKeys[rkc.next] = rootKey rkc.next = (rkc.next + 1) % 2 } -func newRootkeyCache() *rootkeyCache { - return &rootkeyCache{ +func newRootKeyCache() *rootKeyCache { + return &rootKeyCache{ versions: [2]int64{-1, -1}, rootKeys: [2][]byte{}, next: 0, @@ -457,7 +467,7 @@ func newRootkeyCache() *rootkeyCache { // deleteVersion deletes a tree version from disk. // deletes orphans -func (ndb *nodeDB) deleteVersion(version int64, cache *rootkeyCache) error { +func (ndb *nodeDB) deleteVersion(version int64, cache *rootKeyCache) error { rootKey, err := cache.getRootKey(ndb, version) if err != nil && !errors.Is(err, ErrVersionDoesNotExist) { return err @@ -468,9 +478,9 @@ func (ndb *nodeDB) deleteVersion(version int64, cache *rootkeyCache) error { } if rootKey != nil { - if err := ndb.traverseOrphansWithRootkeyCache(cache, version, version+1, func(orphan *Node) error { + if err := ndb.traverseOrphansWithRootKeyCache(cache, version, version+1, func(orphan *Node) error { if orphan.nodeKey.nonce == 0 && !orphan.isLegacy { - // if the orphan is a reformatted root, it can be a legacy root + // if the orphan is a reformatted root, it can be a legacy root, // so it should be removed from the pruning process. if err := ndb.deleteFromPruning(ndb.legacyNodeKey(orphan.hash)); err != nil { return err @@ -633,7 +643,7 @@ func (ndb *nodeDB) DeleteVersionsFrom(fromVersion int64) error { return err } - // NOTICE: we don't touch fast node indexes here, because it'll be rebuilt later because of version mismatch. + // NOTICE: we don't touch fast node indexes here because it'll be rebuilt later because of version mismatch. ndb.resetLatestVersion(dumpFromVersion - 1) @@ -737,9 +747,8 @@ func (ndb *nodeDB) deleteVersionsTo(toVersion int64) error { ndb.resetLegacyLatestVersion(-1) } - rootkeyCache := newRootkeyCache() for version := first; version <= toVersion; version++ { - if err := ndb.deleteVersion(version, rootkeyCache); err != nil { + if err := ndb.deleteVersion(version, newRootKeyCache()); err != nil { return err } ndb.resetFirstVersion(version + 1) @@ -1155,11 +1164,10 @@ func isReferenceRoot(bz []byte) (bool, int) { // traverseOrphans traverses orphans which removed by the updates of the curVersion in the prevVersion. // NOTE: it is used for both legacy and new nodes. func (ndb *nodeDB) traverseOrphans(prevVersion, curVersion int64, fn func(*Node) error) error { - cache := newRootkeyCache() - return ndb.traverseOrphansWithRootkeyCache(cache, prevVersion, curVersion, fn) + return ndb.traverseOrphansWithRootKeyCache(newRootKeyCache(), prevVersion, curVersion, fn) } -func (ndb *nodeDB) traverseOrphansWithRootkeyCache(cache *rootkeyCache, prevVersion, curVersion int64, fn func(*Node) error) error { +func (ndb *nodeDB) traverseOrphansWithRootKeyCache(cache *rootKeyCache, prevVersion, curVersion int64, fn func(*Node) error) error { curKey, err := cache.getRootKey(ndb, curVersion) if err != nil { return err @@ -1232,7 +1240,7 @@ func (ndb *nodeDB) Close() error { // Utility and test functions func (ndb *nodeDB) leafNodes() ([]*Node, error) { - leaves := []*Node{} + var leaves []*Node err := ndb.traverseNodes(func(node *Node) error { if node.isLeaf() { @@ -1248,7 +1256,7 @@ func (ndb *nodeDB) leafNodes() ([]*Node, error) { } func (ndb *nodeDB) nodes() ([]*Node, error) { - nodes := []*Node{} + var nodes []*Node err := ndb.traverseNodes(func(node *Node) error { nodes = append(nodes, node) @@ -1262,7 +1270,7 @@ func (ndb *nodeDB) nodes() ([]*Node, error) { } func (ndb *nodeDB) legacyNodes() ([]*Node, error) { - nodes := []*Node{} + var nodes []*Node err := ndb.traversePrefix(legacyNodeKeyFormat.Prefix(), func(key, value []byte) error { node, err := MakeLegacyNode(key[1:], value) @@ -1280,7 +1288,7 @@ func (ndb *nodeDB) legacyNodes() ([]*Node, error) { } func (ndb *nodeDB) orphans() ([][]byte, error) { - orphans := [][]byte{} + var orphans [][]byte for version := ndb.firstVersion; version < ndb.latestVersion; version++ { err := ndb.traverseOrphans(version, version+1, func(orphan *Node) error { @@ -1313,7 +1321,7 @@ func (ndb *nodeDB) size() int { } func (ndb *nodeDB) traverseNodes(fn func(node *Node) error) error { - nodes := []*Node{} + var nodes []*Node if err := ndb.traversePrefix(nodeKeyFormat.Prefix(), func(key, value []byte) error { if isRef, _ := isReferenceRoot(value); isRef { @@ -1341,7 +1349,7 @@ func (ndb *nodeDB) traverseNodes(fn func(node *Node) error) error { return nil } -// traverseStateChanges iterate the range of versions, compare each version to it's predecessor to extract the state changes of it. +// traverseStateChanges iterate the range of versions, compare each version to its predecessor to extract the state changes of it. // endVersion is exclusive, set to `math.MaxInt64` to cover the latest version. func (ndb *nodeDB) traverseStateChanges(startVersion, endVersion int64, fn func(version int64, changeSet *ChangeSet) error) error { firstVersion, err := ndb.getFirstVersion() @@ -1361,7 +1369,7 @@ func (ndb *nodeDB) traverseStateChanges(startVersion, endVersion int64, fn func( prevVersion := startVersion - 1 prevRoot, err := ndb.GetRoot(prevVersion) - if err != nil && err != ErrVersionDoesNotExist { + if err != nil && !errors.Is(err, ErrVersionDoesNotExist) { return err } @@ -1399,7 +1407,7 @@ func (ndb *nodeDB) String() (string, error) { index := 0 err := ndb.traversePrefix(nodeKeyFormat.Prefix(), func(key, value []byte) error { - fmt.Fprintf(buf, "%s: %x\n", key, value) + _, _ = fmt.Fprintf(buf, "%s: %x\n", key, value) return nil }) if err != nil { @@ -1411,12 +1419,12 @@ func (ndb *nodeDB) String() (string, error) { if err = ndb.traverseNodes(func(node *Node) error { switch { case node == nil: - fmt.Fprintf(buf, "%s: \n", nodeKeyFormat.Prefix()) + _, _ = fmt.Fprintf(buf, "%s: \n", nodeKeyFormat.Prefix()) case node.value == nil && node.subtreeHeight > 0: - fmt.Fprintf(buf, "%s: %s %-16s h=%d nodeKey=%v\n", + _, _ = fmt.Fprintf(buf, "%s: %s %-16s h=%d nodeKey=%v\n", nodeKeyFormat.Prefix(), node.key, "", node.subtreeHeight, node.nodeKey) default: - fmt.Fprintf(buf, "%s: %s = %-16s h=%d nodeKey=%v\n", + _, _ = fmt.Fprintf(buf, "%s: %s = %-16s h=%d nodeKey=%v\n", nodeKeyFormat.Prefix(), node.key, node.value, node.subtreeHeight, node.nodeKey) } index++ @@ -1427,5 +1435,3 @@ func (ndb *nodeDB) String() (string, error) { return "-" + "\n" + buf.String() + "-", nil } - -var ErrNodeMissingNodeKey = errors.New("node does not have a nodeKey") diff --git a/nodedb_commit_test.go b/nodedb_commit_test.go new file mode 100644 index 000000000..beb720572 --- /dev/null +++ b/nodedb_commit_test.go @@ -0,0 +1,167 @@ +package iavl + +import ( + "sync" + "testing" + "time" + + dbm "github.com/cosmos/iavl/db" + "github.com/stretchr/testify/require" +) + +func setupNodeDB(t *testing.T) *nodeDB { + t.Helper() + db := dbm.NewMemDB() + return newNodeDB(db, 0, DefaultOptions(), NewNopLogger()) +} + +func TestCommittingFlags(t *testing.T) { + ndb := setupNodeDB(t) + + require.False(t, ndb.IsCommitting()) + + ndb.SetCommitting() + require.True(t, ndb.IsCommitting()) + + ndb.UnsetCommitting() + require.False(t, ndb.IsCommitting()) + + select { + case <-ndb.chCommitting: + // expected + case <-time.After(100 * time.Millisecond): + t.Fatal("expected signal on chCommitting") + } +} + +func TestCommittingSignalRelease(t *testing.T) { + ndb := setupNodeDB(t) + ndb.SetCommitting() + + signaled := make(chan bool) + go func() { + <-ndb.chCommitting + signaled <- true + }() + + time.Sleep(50 * time.Millisecond) + require.True(t, ndb.IsCommitting()) + + select { + case <-signaled: + t.Fatal("signal received too early") + case <-time.After(50 * time.Millisecond): + // ok + } + + ndb.UnsetCommitting() + + select { + case <-signaled: + // expected + case <-time.After(100 * time.Millisecond): + t.Fatal("expected signal after UnsetCommitting") + } +} + +func TestDeleteFromPruningDuringCommit(t *testing.T) { + ndb := setupNodeDB(t) + dummyKey := []byte("key-to-delete") + ndb.SetCommitting() + + done := make(chan struct{}) + go func() { + err := ndb.deleteFromPruning(dummyKey) + require.NoError(t, err) + close(done) + }() + + time.Sleep(50 * time.Millisecond) + select { + case <-done: + t.Fatal("deleteFromPruning should block during commit") + default: + // expected + } + + ndb.UnsetCommitting() + + select { + case <-done: + // success + case <-time.After(200 * time.Millisecond): + t.Fatal("deleteFromPruning did not resume after UnsetCommitting") + } +} + +func TestSaveNodeFromPruningDuringCommit(t *testing.T) { + ndb := setupNodeDB(t) + node := &Node{ + key: []byte("key"), + value: []byte("val"), + size: 1, + subtreeHeight: 0, + nodeKey: &NodeKey{version: 1, nonce: 0}, + hash: []byte("hash"), + } + + ndb.SetCommitting() + done := make(chan struct{}) + go func() { + err := ndb.saveNodeFromPruning(node) + require.NoError(t, err) + close(done) + }() + + time.Sleep(50 * time.Millisecond) + select { + case <-done: + t.Fatal("saveNodeFromPruning should block during commit") + default: + // expected + } + + ndb.UnsetCommitting() + + select { + case <-done: + // success + case <-time.After(200 * time.Millisecond): + t.Fatal("saveNodeFromPruning did not resume after UnsetCommitting") + } +} + +func TestConcurrentCommitAndPruningAccess(t *testing.T) { + ndb := setupNodeDB(t) + var wg sync.WaitGroup + + // Commit simulation + wg.Add(1) + go func() { + defer wg.Done() + ndb.SetCommitting() + time.Sleep(100 * time.Millisecond) // simulate commit duration + ndb.UnsetCommitting() + }() + + // Pruning access simulation + wg.Add(1) + go func() { + defer wg.Done() + start := time.Now() + err := ndb.deleteFromPruning([]byte("key1")) + require.NoError(t, err) + elapsed := time.Since(start) + require.GreaterOrEqual(t, elapsed.Milliseconds(), int64(100), "should wait until commit is done") + }() + + // Simultaneous state check + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(10 * time.Millisecond) + require.True(t, ndb.IsCommitting()) + }() + + wg.Wait() +}