Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support rollback #1610

Open
wants to merge 14 commits into
base: node-updates-plugin-l2
Choose a base branch
from
Prev Previous commit
Next Next commit
Added constant keys handling
esuwu committed Mar 23, 2025
commit 671e4a32c2751fd020b3e993a7ae47cf1bd3530b
6 changes: 6 additions & 0 deletions cmd/blockchaininfo/nats_subscriber.go
Original file line number Diff line number Diff line change
@@ -159,6 +159,12 @@ func main() {
}
defer nc.Close()

keysErr := blockchaininfo.SendConstantKeys(nc)
if keysErr != nil {
zap.S().Fatalf("Failed to send constant keys: %v", keysErr)
return
}

_, err = nc.Subscribe(blockchaininfo.BlockUpdates, func(msg *nats.Msg) {
receiveBlockUpdates(msg)
})
5 changes: 2 additions & 3 deletions cmd/node/node.go
Original file line number Diff line number Diff line change
@@ -433,8 +433,7 @@ func runNode(ctx context.Context, nc *config) (_ io.Closer, retErr error) {
var bUpdatesExtension *blockchaininfo.BlockchainUpdatesExtension
if nc.enableBlockchainUpdatesPlugin {
var bUErr error

bUpdatesExtension, bUErr = initializeBlockchainUpdatesPlugin(ctx, cfg, nc.BlockchainUpdatesL2Address,
bUpdatesExtension, bUErr = initializeBlockchainUpdatesExtension(ctx, cfg, nc.BlockchainUpdatesL2Address,
updatesChannel, &firstBlock, st)
if bUErr != nil {
return nil, errors.Wrap(bUErr, "failed to run blockchain updates plugin")
@@ -846,7 +845,7 @@ func runAPIs(
return nil
}

func initializeBlockchainUpdatesPlugin(
func initializeBlockchainUpdatesExtension(
ctx context.Context,
cfg *settings.BlockchainSettings,
l2ContractAddress string,
4 changes: 2 additions & 2 deletions pkg/blockchaininfo/bupdates.go
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ import (
)

type BlockchainUpdatesExtension struct {
Ctx context.Context
ctx context.Context
l2ContractAddress proto.WavesAddress
BUpdatesChannel chan proto.BUpdatesInfo
firstBlock *bool
@@ -24,7 +24,7 @@ func NewBlockchainUpdatesExtension(
firstBlock *bool,
) *BlockchainUpdatesExtension {
return &BlockchainUpdatesExtension{
Ctx: ctx,
ctx: ctx,
l2ContractAddress: l2ContractAddress,
BUpdatesChannel: bUpdatesChannel,
firstBlock: firstBlock,
89 changes: 58 additions & 31 deletions pkg/blockchaininfo/nats_publisher.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package blockchaininfo

import (
"context"
"strings"
"time"

"go.uber.org/zap"
@@ -29,31 +30,33 @@ const (

const L2RequestsTopic = "l2_requests_topic"
const (
RequestRestartSubTopic = "restart"
RequestRestartSubTopic = "restart"
RequestConstantKeysSubTopic = "constant_keys"
)

func ConcatenateContractTopics(contractAddress string) string {
return ContractUpdates + contractAddress
}

type BUpdatesExtensionState struct {
CurrentState *proto.BUpdatesInfo
PreviousState *proto.BUpdatesInfo // this information is what was just published
Limit uint64
Scheme proto.Scheme
L2ContractAddress string
HistoryJournal *HistoryJournal
St state.State
CurrentState *proto.BUpdatesInfo
PreviousState *proto.BUpdatesInfo // this information is what was just published
Limit uint64
Scheme proto.Scheme
constantContractKeys []string
L2ContractAddress string
HistoryJournal *HistoryJournal
St state.State
}

type UpdatesPublisher struct {
l2ContractAddress string
}

func NewBUpdatesExtensionState(limit uint64, scheme proto.Scheme, l2ContractAddress string,
state state.State) (*BUpdatesExtensionState, error) {
st state.State) (*BUpdatesExtensionState, error) {
stateCache := NewStateCache()
currentHeight, err := state.Height()
currentHeight, err := st.Height()
if err != nil {
return nil, err
}
@@ -63,42 +66,39 @@ func NewBUpdatesExtensionState(limit uint64, scheme proto.Scheme, l2ContractAddr
}
historyJournal := NewHistoryJournal()
for i := 0; i < HistoryJournalLengthMax; i++ {
dataEntriesAtHeight, retrieveErr := state.RetrieveEntriesAtHeight(l2address, currentHeight)
blockSnapshot, retrieveErr := st.SnapshotsAtHeight(currentHeight - 1)
if retrieveErr != nil {
return nil, retrieveErr
}
filteredDataEntries, filtrErr := filterDataEntries(currentHeight-limit, dataEntriesAtHeight)
if filtrErr != nil {
return nil, errors.Wrap(filtrErr, "failed to initialize state cache, failed to filter data entries")
}
blockInfo, pullErr := state.NewestBlockInfoByHeight(currentHeight)
blockInfo, pullErr := st.NewestBlockInfoByHeight(currentHeight - 1)
if pullErr != nil {
return nil, errors.Wrap(pullErr, "failed to get newest block info")
}
block, fetchErr := state.BlockByHeight(currentHeight)
if fetchErr != nil {
return nil, errors.Wrap(fetchErr, "failed to get block by height")
blockHeader, blockErr := st.NewestHeaderByHeight(currentHeight - 1)
if blockErr != nil {
return nil, errors.Wrap(blockErr, "failed to get newest block info")
}
blockUpdatesInfo := proto.BlockUpdatesInfo{
Height: blockInfo.Height,
VRF: blockInfo.VRF,
BlockID: block.BlockID(),
BlockHeader: block.BlockHeader,
bUpdatesInfo := state.BuildBlockUpdatesInfoFromSnapshot(blockInfo, blockHeader, blockSnapshot, l2address)

filteredDataEntries, filtrErr := filterDataEntries(currentHeight-limit,
bUpdatesInfo.ContractUpdatesInfo.AllDataEntries)
if filtrErr != nil {
return nil, errors.Wrap(filtrErr, "failed to initialize state cache, failed to filter data entries")
}
stateCache.AddCacheRecord(currentHeight, filteredDataEntries, blockUpdatesInfo)

stateCache.AddCacheRecord(currentHeight-1, filteredDataEntries, bUpdatesInfo.BlockUpdatesInfo)

historyEntry := HistoryEntry{
Height: currentHeight,
BlockID: block.BlockID(),
BlockID: bUpdatesInfo.BlockUpdatesInfo.BlockID,
Entries: filteredDataEntries,
}
historyJournal.Push(historyEntry)
currentHeight--
}

return &BUpdatesExtensionState{Limit: limit, Scheme: scheme,
L2ContractAddress: l2ContractAddress, HistoryJournal: historyJournal, St: state}, nil

L2ContractAddress: l2ContractAddress, HistoryJournal: historyJournal, St: st}, nil
}

func (bu *BUpdatesExtensionState) SetPreviousState(updates proto.BUpdatesInfo) {
@@ -293,6 +293,15 @@ func (bu *BUpdatesExtensionState) GeneratePatch(latestUpdates proto.BUpdatesInfo
return patch, nil
}

func (bu *BUpdatesExtensionState) IsKeyConstant(keyDataEntry string) bool {
for _, constantKey := range bu.constantContractKeys {
if constantKey == keyDataEntry {
return true
}
}
return false
}

func (bu *BUpdatesExtensionState) BuildPatch(keysForPatch []string, targetHeight uint64) (proto.DataEntries,
proto.BlockUpdatesInfo, error) {
l2WavesAddress, cnvrtErr := proto.NewAddressFromString(bu.L2ContractAddress)
@@ -305,9 +314,13 @@ func (bu *BUpdatesExtensionState) BuildPatch(keysForPatch []string, targetHeight
recipient := proto.NewRecipientFromAddress(l2WavesAddress)
dataEntry, ok, err := bu.HistoryJournal.StateCache.SearchValue(dataEntryKey, targetHeight)
if err != nil {
// height is too deep
dataEntry, err = bu.St.RetrieveEntry(recipient, dataEntryKey)
if err != nil {
// If the key is constant, we will go to State, if not, consider it a DeleteDataEntry
if bu.IsKeyConstant(dataEntryKey) {
dataEntry, err = bu.St.RetrieveEntry(recipient, dataEntryKey)
if err != nil {
dataEntry = &proto.DeleteDataEntry{Key: dataEntryKey}
}
} else {
dataEntry = &proto.DeleteDataEntry{Key: dataEntryKey}
}
}
@@ -464,6 +477,12 @@ func (e *BlockchainUpdatesExtension) RunBlockchainUpdatesPublisher(ctx context.C
}
defer nc.Close()

constantKeys, err := requestConstantKeys(nc)
if err != nil {
return errors.Wrap(err, "failed to request constant keys from the client")
}
e.blockchainExtensionState.constantContractKeys = constantKeys

updatesPublisher := UpdatesPublisher{l2ContractAddress: e.l2ContractAddress.String()}
// Publish the first 100 history entries for the rollback functionality.
publishHistoryBlocks(e, scheme, nc, updatesPublisher)
@@ -476,6 +495,14 @@ func (e *BlockchainUpdatesExtension) RunBlockchainUpdatesPublisher(ctx context.C
return nil
}

func requestConstantKeys(nc *nats.Conn) ([]string, error) {
msg, err := nc.Request(ConstantKeys, nil, ConnectionsTimeoutDefault)
if err != nil {
return nil, err
}
return strings.Split(string(msg.Data), ","), nil
}

func publishHistoryBlocks(e *BlockchainUpdatesExtension, scheme proto.Scheme,
nc *nats.Conn, updatesPublisher UpdatesPublisher) {
for _, historyEntry := range e.blockchainExtensionState.HistoryJournal.historyJournal {
40 changes: 39 additions & 1 deletion pkg/blockchaininfo/signals.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,32 @@
package blockchaininfo

import "github.com/nats-io/nats.go"
import (
"strings"

"go.uber.org/zap"

"github.com/nats-io/nats.go"
)

const (
TokenID = "tokenId"
AllMiners = "allMiners"
LastChainID = "lastChainId"
MinerReward = "minerReward"
Chain0Height = "chain0Height"
ThisEpochData = "thisEpochData"
)

func ConstantContractKeys() []string {
return []string{
TokenID,
AllMiners,
LastChainID,
MinerReward,
Chain0Height,
ThisEpochData,
}
}

func SendRestartSignal(nc *nats.Conn) (*nats.Msg, error) {
message := []byte(RequestRestartSubTopic)
@@ -10,3 +36,15 @@ func SendRestartSignal(nc *nats.Conn) (*nats.Msg, error) {
}
return msg, nil
}

func SendConstantKeys(nc *nats.Conn) error {
_, subErr := nc.Subscribe(ConstantKeys, func(request *nats.Msg) {
constantKeys := strings.Join(ConstantContractKeys(), ",")
err := request.Respond([]byte(constantKeys))
if err != nil {
zap.S().Errorf("failed to respond to a restart signal, %v", err)
return
}
})
return subErr
}
2 changes: 2 additions & 0 deletions pkg/blockchaininfo/topics.go
Original file line number Diff line number Diff line change
@@ -4,4 +4,6 @@ package blockchaininfo
const (
BlockUpdates = "block_topic"
ContractUpdates = "contract_topic"

ConstantKeys = "constant_keys"
)
60 changes: 30 additions & 30 deletions pkg/mock/state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 0 additions & 71 deletions pkg/state/accounts_data_storage.go
Original file line number Diff line number Diff line change
@@ -282,77 +282,6 @@ func (s *accountsDataStorage) retrieveEntries(addr proto.Address) ([]proto.DataE
return entries, nil
}

func (s *accountsDataStorage) RetrieveEntriesAtHeight(addr proto.Address, height uint64) ([]proto.DataEntry, error) {
addrNum, err := s.addrToNum(addr)
if err != nil {
return nil, proto.ErrNotFound
}
key := accountsDataStorKey{addrNum: addrNum}

recordBytes, err := s.hs.entryDataAtHeight(key.bytes(), height)
if errors.Is(err, keyvalue.ErrNotFound) || errors.Is(err, errEmptyHist) || recordBytes == nil {
return nil, keyvalue.ErrNotFound
}
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve data at height")
}

var dataEntries proto.DataEntries
pos := 0
for {
// Check if there's enough data for a key size
if pos+2 > len(recordBytes) {
break
}

keySize := binary.BigEndian.Uint16(recordBytes[pos : pos+2])
pos += 2

// Check if there's enough data for the key
if pos+int(keySize) > len(recordBytes) {
break
}

key := recordBytes[pos : pos+int(keySize)]
pos += int(keySize)

// Check if there's enough data for a value size
if pos+2 > len(recordBytes) {
break
}

valueSize := binary.BigEndian.Uint16(recordBytes[pos : pos+2])
pos += 2

// Check if there's enough data for the value
if pos+int(valueSize) > len(recordBytes) {
break
}

value := recordBytes[pos : pos+int(valueSize)]
pos += int(valueSize)

var record dataEntryRecord
if unmarhslErr := record.unmarshalBinary(value); unmarhslErr != nil {
return nil, unmarhslErr
}

var entryKey accountsDataStorKey
if marshlErr := entryKey.unmarshal(key); marshlErr != nil {
return nil, marshlErr
}

entry, convErr := proto.NewDataEntryFromValueBytes(record.value)
if convErr != nil {
return nil, convErr
}

entry.SetKey(entryKey.entryKey)
dataEntries = append(dataEntries, entry)
}
return dataEntries, nil
}

func (s *accountsDataStorage) newestEntryExists(addr proto.Address) (bool, error) {
addrNum, newest, err := s.newestAddrToNum(addr)
if err != nil {
2 changes: 1 addition & 1 deletion pkg/state/api.go
Original file line number Diff line number Diff line change
@@ -40,6 +40,7 @@ type StateInfo interface {
// Header getters.
Header(blockID proto.BlockID) (*proto.BlockHeader, error)
HeaderByHeight(height proto.Height) (*proto.BlockHeader, error)
NewestHeaderByHeight(height uint64) (*proto.BlockHeader, error)
// Height returns current blockchain height.
Height() (proto.Height, error)
// Height <---> blockID converters.
@@ -83,7 +84,6 @@ type StateInfo interface {
// Accounts data storage.
RetrieveEntries(account proto.Recipient) ([]proto.DataEntry, error)
RetrieveEntry(account proto.Recipient, key string) (proto.DataEntry, error)
RetrieveEntriesAtHeight(addr proto.Address, height uint64) ([]proto.DataEntry, error)
RetrieveIntegerEntry(account proto.Recipient, key string) (*proto.IntegerDataEntry, error)
RetrieveBooleanEntry(account proto.Recipient, key string) (*proto.BooleanDataEntry, error)
RetrieveStringEntry(account proto.Recipient, key string) (*proto.StringDataEntry, error)
36 changes: 22 additions & 14 deletions pkg/state/appender.go
Original file line number Diff line number Diff line change
@@ -870,6 +870,26 @@ func (a *txAppender) appendBlock(params *appendBlockParams) error {

func (a *txAppender) updateBlockchainUpdateInfo(blockInfo *proto.BlockInfo, blockHeader *proto.BlockHeader,
blockSnapshot proto.BlockSnapshot) error {
bUpdatesInfo := BuildBlockUpdatesInfoFromSnapshot(blockInfo, blockHeader, blockSnapshot,
a.bUpdatesPluginInfo.L2ContractAddress)

if *a.bUpdatesPluginInfo.FirstBlock {
dataEntries, err := a.ia.state.RetrieveEntries(proto.NewRecipientFromAddress(a.bUpdatesPluginInfo.L2ContractAddress))
if err != nil && !errors.Is(err, proto.ErrNotFound) {
return err
}
bUpdatesInfo.ContractUpdatesInfo.AllDataEntries = dataEntries
a.bUpdatesPluginInfo.FirstBlockDone()
a.bUpdatesPluginInfo.WriteBUpdates(bUpdatesInfo)
return nil
}

a.bUpdatesPluginInfo.WriteBUpdates(bUpdatesInfo)
return nil
}

func BuildBlockUpdatesInfoFromSnapshot(blockInfo *proto.BlockInfo, blockHeader *proto.BlockHeader,
blockSnapshot proto.BlockSnapshot, l2ContractAddress proto.WavesAddress) proto.BUpdatesInfo {
blockID := blockHeader.BlockID()
bUpdatesInfo := proto.BUpdatesInfo{
BlockUpdatesInfo: proto.BlockUpdatesInfo{
@@ -883,31 +903,19 @@ func (a *txAppender) updateBlockchainUpdateInfo(blockInfo *proto.BlockInfo, bloc
Height: blockInfo.Height,
},
}
if *a.bUpdatesPluginInfo.FirstBlock {
dataEntries, err := a.ia.state.RetrieveEntries(proto.NewRecipientFromAddress(a.bUpdatesPluginInfo.L2ContractAddress))
if err != nil && !errors.Is(err, proto.ErrNotFound) {
return err
}
bUpdatesInfo.ContractUpdatesInfo.AllDataEntries = dataEntries
a.bUpdatesPluginInfo.FirstBlockDone()
a.bUpdatesPluginInfo.WriteBUpdates(bUpdatesInfo)
return nil
}

// Write the L2 contract updates into the structure.
for _, txSnapshots := range blockSnapshot.TxSnapshots {
for _, snapshot := range txSnapshots {
if dataEntriesSnapshot, ok := snapshot.(*proto.DataEntriesSnapshot); ok {
if dataEntriesSnapshot.Address == a.bUpdatesPluginInfo.L2ContractAddress {
if dataEntriesSnapshot.Address == l2ContractAddress {
bUpdatesInfo.ContractUpdatesInfo.AllDataEntries = append(bUpdatesInfo.ContractUpdatesInfo.AllDataEntries,
dataEntriesSnapshot.DataEntries...)
}
}
}
}

a.bUpdatesPluginInfo.WriteBUpdates(bUpdatesInfo)
return nil
return bUpdatesInfo
}

func (a *txAppender) createCheckerInfo(params *appendBlockParams) (*checkerInfo, error) {
15 changes: 14 additions & 1 deletion pkg/state/database.go
Original file line number Diff line number Diff line change
@@ -34,19 +34,32 @@ func (inf *stateInfo) unmarshalBinary(data []byte) error {
return cbor.Unmarshal(data, inf)
}

const forceUpdateStateVersion = false

func saveStateInfo(db keyvalue.KeyValue, params StateParams) error {
has, err := db.Has(stateInfoKeyBytes)
if err != nil {
return err
}
if has {
if has && !forceUpdateStateVersion {
return nil
}
info := &stateInfo{
Version: StateVersion,
HasExtendedApiData: params.StoreExtendedApiData,
HasStateHashes: params.BuildStateHashes,
}
if has && forceUpdateStateVersion {
data, dbErr := db.Get(stateInfoKeyBytes)
if dbErr != nil {
return dbErr
}
if ubErr := info.unmarshalBinary(data); ubErr != nil {
return ubErr
}
info.Version = StateVersion
}

return putStateInfoToDB(db, info)
}

1 change: 0 additions & 1 deletion pkg/state/history_storage.go
Original file line number Diff line number Diff line change
@@ -593,7 +593,6 @@ func (hs *historyStorage) getHistory(key []byte, update bool) (*historyRecord, e
// So we do both read and write under same lock.
hs.writeLock.Lock()
defer hs.writeLock.Unlock()

historyBytes, err := hs.db.Get(key)
if err != nil {
return nil, err // `keyvalue.ErrNotFound` is possible here along with other unwrapped DB errors
11 changes: 0 additions & 11 deletions pkg/state/state.go
Original file line number Diff line number Diff line change
@@ -2208,17 +2208,6 @@ func (s *stateManager) IsActiveLightNodeNewBlocksFields(blockHeight proto.Height
return s.cv.ShouldIncludeNewBlockFieldsOfLightNodeFeature(blockHeight)
}

func (s *stateManager) RetrieveEntriesAtHeight(addr proto.Address, height uint64) ([]proto.DataEntry, error) {
entries, err := s.stor.accountsDataStor.RetrieveEntriesAtHeight(addr, height)
if err != nil {
if errors.Is(err, proto.ErrNotFound) {
return nil, err
}
return nil, wrapErr(RetrievalError, err)
}
return entries, nil
}

func (s *stateManager) NewestAddrByAlias(alias proto.Alias) (proto.WavesAddress, error) {
addr, err := s.stor.aliases.newestAddrByAlias(alias.Alias)
if err != nil {
12 changes: 6 additions & 6 deletions pkg/state/threadsafe_wrapper.go
Original file line number Diff line number Diff line change
@@ -68,6 +68,12 @@ func (a *ThreadSafeReadWrapper) HeaderByHeight(height proto.Height) (*proto.Bloc
return a.s.HeaderByHeight(height)
}

func (a *ThreadSafeReadWrapper) NewestHeaderByHeight(height uint64) (*proto.BlockHeader, error) {
a.mu.RLock()
defer a.mu.RUnlock()
return a.s.NewestHeaderByHeight(height)
}

func (a *ThreadSafeReadWrapper) Height() (proto.Height, error) {
a.mu.RLock()
defer a.mu.RUnlock()
@@ -212,12 +218,6 @@ func (a *ThreadSafeReadWrapper) RetrieveEntries(account proto.Recipient) ([]prot
return a.s.RetrieveEntries(account)
}

func (a *ThreadSafeReadWrapper) RetrieveEntriesAtHeight(addr proto.Address, height uint64) ([]proto.DataEntry, error) {
a.mu.RLock()
defer a.mu.RUnlock()
return a.s.RetrieveEntriesAtHeight(addr, height)
}

func (a *ThreadSafeReadWrapper) RetrieveEntry(account proto.Recipient, key string) (proto.DataEntry, error) {
a.mu.RLock()
defer a.mu.RUnlock()

Unchanged files with check annotations Beta

func (typedData *ethereumTypedData) Hash() (EthereumHash, error) {
rawData, err := typedData.RawData()
if err != nil {
return EthereumHash{}, nil

Check failure on line 76 in pkg/proto/eth_typed_data.go

GitHub Actions / lint (macos-latest)

error is not nil (line 74) but it returns nil (nilerr)

Check failure on line 76 in pkg/proto/eth_typed_data.go

GitHub Actions / lint (ubuntu-latest)

error is not nil (line 74) but it returns nil (nilerr)

Check failure on line 76 in pkg/proto/eth_typed_data.go

GitHub Actions / ubuntu

error is not nil (line 74) but it returns nil (nilerr)
}
return Keccak256EthereumHash(rawData), nil
}
}
// dataMismatchError generates an error for a mismatch between

Check failure on line 408 in pkg/proto/eth_typed_data.go

GitHub Actions / lint (macos-latest)

Comment should end in a period (godot)

Check failure on line 408 in pkg/proto/eth_typed_data.go

GitHub Actions / lint (ubuntu-latest)

Comment should end in a period (godot)

Check failure on line 408 in pkg/proto/eth_typed_data.go

GitHub Actions / ubuntu

Comment should end in a period (godot)
// the provided type and data
func dataMismatchError(encType string, encValue interface{}) error {
return errors.Errorf("provided data '%v' doesn't match type '%s'", encValue, encType)