diff --git a/.mockery.yaml b/.mockery.yaml index a0c9da282..803a150b5 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -1,6 +1,7 @@ quiet: False dir: "{{.InterfaceDir}}/mocks" -mockname: "Mock{{.InterfaceName}}" +with-expecter: true +issue-845-fix: True filename: "{{.InterfaceName | snakecase}}.go" packages: @@ -9,3 +10,6 @@ packages: Header: Protocol: Handler: + github.com/wavesplatform/gowaves/pkg/blockchaininfo: + interfaces: + UpdatesPublisherInterface: \ No newline at end of file diff --git a/Makefile b/Makefile index ca466bfbe..850121810 100644 --- a/Makefile +++ b/Makefile @@ -237,6 +237,8 @@ mock: mockgen -source pkg/p2p/peer/peer.go -destination pkg/mock/peer.go -package mock Peer mockgen -source pkg/state/api.go -destination pkg/mock/state.go -package mock State mockgen -source pkg/grpc/server/api.go -destination pkg/mock/grpc.go -package mock GrpcHandlers + mockery --dir=pkg/mock --filename=blockchaininfo_types.go --outpkg=mock # The interface name must be specified in .mockery.yaml, see examples there. + proto: @protoc --proto_path=pkg/grpc/protobuf-schemas/proto/ --go_out=./ --go_opt=module=$(MODULE) --go-vtproto_out=./ --go-vtproto_opt=features=marshal_strict+unmarshal+size --go-vtproto_opt=module=$(MODULE) pkg/grpc/protobuf-schemas/proto/waves/*.proto diff --git a/cmd/blockchaininfo/nats_subscriber.go b/cmd/blockchaininfo/nats_subscriber.go index 4baf1dccf..7756db829 100644 --- a/cmd/blockchaininfo/nats_subscriber.go +++ b/cmd/blockchaininfo/nats_subscriber.go @@ -159,6 +159,12 @@ func main() { } defer nc.Close() + keysErr := blockchaininfo.SendConstantKeys(nc) + if keysErr != nil { + zap.S().Fatalf("Failed to send constant keys: %v", keysErr) + return + } + _, err = nc.Subscribe(blockchaininfo.BlockUpdates, func(msg *nats.Msg) { receiveBlockUpdates(msg) }) diff --git a/cmd/node/node.go b/cmd/node/node.go index 632081f8d..3364821d3 100644 --- a/cmd/node/node.go +++ b/cmd/node/node.go @@ -371,6 +371,20 @@ func run(nc *config) (retErr error) { return nil } +func initBlockchainUpdatesPlugin(ctx context.Context, + l2addressContract string, + enableBlockchainUpdatesPlugin bool, + updatesChannel chan<- proto.BUpdatesInfo, firstBlock *bool, +) (*proto.BlockchainUpdatesPluginInfo, error) { + l2address, cnvrtErr := proto.NewAddressFromString(l2addressContract) + if cnvrtErr != nil { + return nil, errors.Wrapf(cnvrtErr, "failed to convert L2 contract address %q", l2addressContract) + } + bUpdatesPluginInfo := proto.NewBlockchainUpdatesPluginInfo(ctx, l2address, updatesChannel, + firstBlock, enableBlockchainUpdatesPlugin) + return bUpdatesPluginInfo, nil +} + func runNode(ctx context.Context, nc *config) (_ io.Closer, retErr error) { cfg, err := blockchainSettings(nc) if err != nil { @@ -402,25 +416,44 @@ func runNode(ctx context.Context, nc *config) (_ io.Closer, retErr error) { return nil, errors.Wrap(err, "failed to create state parameters") } + updatesChannel := make(chan proto.BUpdatesInfo) + extensionReady := make(chan struct{}) + firstBlock := false + bUpdatesPluginInfo, initErr := initBlockchainUpdatesPlugin(ctx, nc.BlockchainUpdatesL2Address, + nc.enableBlockchainUpdatesPlugin, updatesChannel, &firstBlock) + if initErr != nil { + return nil, errors.Wrap(err, "failed to initialize blockchain updates plugin") + } + st, err := state.NewState(path, true, params, cfg, nc.enableLightMode, bUpdatesPluginInfo) + if err != nil { + return nil, errors.Wrap(err, "failed to initialize node's state") + } + defer func() { retErr = closeIfErrorf(st, retErr, "failed to close state") }() + go func() { + <-extensionReady + bUpdatesPluginInfo.MakeExtensionReady() + close(extensionReady) + }() + var bUpdatesExtension *blockchaininfo.BlockchainUpdatesExtension if nc.enableBlockchainUpdatesPlugin { var bUErr error - bUpdatesExtension, bUErr = runBlockchainUpdatesPlugin(ctx, cfg, nc.BlockchainUpdatesL2Address) + bUpdatesExtension, bUErr = initializeBlockchainUpdatesExtension(ctx, cfg, nc.BlockchainUpdatesL2Address, + updatesChannel, &firstBlock, st, extensionReady) if bUErr != nil { return nil, errors.Wrap(bUErr, "failed to run blockchain updates plugin") } - go bUpdatesExtension.ReceiveSignals() + go func() { + publshrErr := bUpdatesExtension.RunBlockchainUpdatesPublisher(ctx, + cfg.AddressSchemeCharacter) + if publshrErr != nil { + zap.S().Fatalf("Failed to run blockchain updates publisher: %v", publshrErr) + } + }() zap.S().Info("The blockchain info extension started pulling info from smart contract address", nc.BlockchainUpdatesL2Address) } - // Send updatesChannel into BlockchainSettings. Write updates into this channel - st, err := state.NewState(path, true, params, cfg, nc.enableLightMode, bUpdatesExtension) - if err != nil { - return nil, errors.Wrap(err, "failed to initialize node's state") - } - defer func() { retErr = closeIfErrorf(st, retErr, "failed to close state") }() - features, err := minerFeatures(st, nc.minerVoteFeatures) if err != nil { return nil, errors.Wrap(err, "failed to parse and validate miner features") @@ -817,34 +850,30 @@ func runAPIs( return nil } -func runBlockchainUpdatesPlugin( +func initializeBlockchainUpdatesExtension( ctx context.Context, cfg *settings.BlockchainSettings, l2ContractAddress string, + updatesChannel chan proto.BUpdatesInfo, + firstBlock *bool, + state state.State, + extensionReady chan<- struct{}, ) (*blockchaininfo.BlockchainUpdatesExtension, error) { - l2address, cnvrtErr := proto.NewAddressFromString(l2ContractAddress) - if cnvrtErr != nil { - return nil, errors.Wrapf(cnvrtErr, "failed to convert L2 contract address %q", l2ContractAddress) - } - - bUpdatesExtensionState := blockchaininfo.NewBUpdatesExtensionState( + bUpdatesExtensionState, err := blockchaininfo.NewBUpdatesExtensionState( blockchaininfo.StoreBlocksLimit, cfg.AddressSchemeCharacter, l2ContractAddress, + state, ) - - updatesChannel := make(chan blockchaininfo.BUpdatesInfo) - requestsChannel := make(chan blockchaininfo.L2Requests) - go func() { - err := bUpdatesExtensionState.RunBlockchainUpdatesPublisher(ctx, updatesChannel, - cfg.AddressSchemeCharacter, requestsChannel) - if err != nil { - zap.S().Fatalf("Failed to run blockchain updates publisher: %v", err) - } - }() - + if err != nil { + return nil, errors.Wrap(err, "failed to initialize blockchain updates extension state") + } + l2address, cnvrtErr := proto.NewAddressFromString(l2ContractAddress) + if cnvrtErr != nil { + return nil, errors.Wrapf(cnvrtErr, "failed to convert L2 contract address %q", l2ContractAddress) + } return blockchaininfo.NewBlockchainUpdatesExtension(ctx, l2address, updatesChannel, - requestsChannel, bUpdatesExtensionState), nil + bUpdatesExtensionState, firstBlock, extensionReady), nil } func FromArgs(scheme proto.Scheme, c *config) func(s *settings.NodeSettings) error { diff --git a/pkg/blockchaininfo/blockchaininfo_test.go b/pkg/blockchaininfo/blockchaininfo_test.go index e5c5fd2e5..a2e899474 100644 --- a/pkg/blockchaininfo/blockchaininfo_test.go +++ b/pkg/blockchaininfo/blockchaininfo_test.go @@ -1,21 +1,26 @@ package blockchaininfo_test import ( + "sort" + "strconv" "testing" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/wavesplatform/gowaves/pkg/blockchaininfo" + mocks "github.com/wavesplatform/gowaves/pkg/mock" "github.com/wavesplatform/gowaves/pkg/proto" ) const natsTestURL = "nats://127.0.0.1:4756" // some random test data. -func testBlockUpdates() blockchaininfo.BlockUpdatesInfo { - var b blockchaininfo.BlockUpdatesInfo +func testBlockUpdates() proto.BlockUpdatesInfo { + var b proto.BlockUpdatesInfo var ( height uint64 = 100 @@ -92,17 +97,17 @@ func TestChangesGenerationNewEntries(t *testing.T) { } var currentHeight uint64 = 3199611 - previousBlockInfo := blockchaininfo.BUpdatesInfo{ + previousBlockInfo := proto.BUpdatesInfo{ BlockUpdatesInfo: testBlockUpdates(), - ContractUpdatesInfo: blockchaininfo.L2ContractDataEntries{ + ContractUpdatesInfo: proto.L2ContractDataEntries{ AllDataEntries: previousDataEntries, Height: previousHeight, }, } - currentBlockInfo := blockchaininfo.BUpdatesInfo{ + currentBlockInfo := proto.BUpdatesInfo{ BlockUpdatesInfo: testBlockUpdates(), - ContractUpdatesInfo: blockchaininfo.L2ContractDataEntries{ + ContractUpdatesInfo: proto.L2ContractDataEntries{ AllDataEntries: currentDataEntries, Height: currentHeight, }, @@ -171,17 +176,17 @@ func TestChangesGenerationContainsPrevious(t *testing.T) { } var currentHeight uint64 = 3199611 - previousBlockInfo := blockchaininfo.BUpdatesInfo{ + previousBlockInfo := proto.BUpdatesInfo{ BlockUpdatesInfo: testBlockUpdates(), - ContractUpdatesInfo: blockchaininfo.L2ContractDataEntries{ + ContractUpdatesInfo: proto.L2ContractDataEntries{ AllDataEntries: previousDataEntries, Height: previousHeight, }, } - currentBlockInfo := blockchaininfo.BUpdatesInfo{ + currentBlockInfo := proto.BUpdatesInfo{ BlockUpdatesInfo: testBlockUpdates(), - ContractUpdatesInfo: blockchaininfo.L2ContractDataEntries{ + ContractUpdatesInfo: proto.L2ContractDataEntries{ AllDataEntries: currentDataEntries, Height: currentHeight, }, @@ -233,17 +238,17 @@ func TestNoChangesGeneration(t *testing.T) { } var currentHeight uint64 = 3199611 - previousBlockInfo := blockchaininfo.BUpdatesInfo{ + previousBlockInfo := proto.BUpdatesInfo{ BlockUpdatesInfo: testBlockUpdates(), - ContractUpdatesInfo: blockchaininfo.L2ContractDataEntries{ + ContractUpdatesInfo: proto.L2ContractDataEntries{ AllDataEntries: previousDataEntries, Height: previousHeight, }, } - currentBlockInfo := blockchaininfo.BUpdatesInfo{ + currentBlockInfo := proto.BUpdatesInfo{ BlockUpdatesInfo: testBlockUpdates(), - ContractUpdatesInfo: blockchaininfo.L2ContractDataEntries{ + ContractUpdatesInfo: proto.L2ContractDataEntries{ AllDataEntries: currentDataEntries, Height: currentHeight, }, @@ -318,3 +323,198 @@ func TestSendRestartSignal(t *testing.T) { require.Equal(t, msg.Data, []byte("ok")) } + +const ( + blockID1 = "7wKAcTGbvDtruMSSYyndzN9YK3cQ47ZdTPeT8ej22qRg" + BlockID2 = "gzz8aN4b5rr1rkeAdmuwytuGv1jbm9LLRbXNKNb7ETX" + BlockID3 = "GrgPhEZ5rruNPSac5QxirgoYA2VwEKBJju3ppPgNyBWi" + BlockID4 = "5g9Ws6Z3SJ9dXN3JqPQxVWeCEYssmYzFdVNXX1rcyHib" + BlockID5 = "AEB4sYgpA2wMVSdzSCkVuN3R2moPnQiStDs9gPSRStny" + BlockID6 = "5bEZ4Y9BiVvM53RtBWmpT5jADeLmSt2vmC1iBB2gKuE8" + + l2ContractAddress = "3Mw2AVgk5xNmkWQkzKKhinhBH1YyBTeVku2" + + checkedBlockNumber = 3 +) + +func fillThirdCheckedBlock(t *testing.T) ([]proto.DataEntry, proto.BlockUpdatesInfo) { + var integerEntries []proto.DataEntry + blockID, err := proto.NewBlockIDFromBase58(BlockID3) + + for j := 1; j <= 3; j++ { + integerDataEntry := &proto.IntegerDataEntry{ + Key: strconv.Itoa(j), + Value: int64(-j), + } + assert.NoError(t, err) + integerEntries = append(integerEntries, integerDataEntry) + } + blockInfo := proto.BlockUpdatesInfo{ + Height: uint64(3), + BlockID: blockID, + } + return integerEntries, blockInfo +} + +func fillHistoryJournal(t *testing.T, stateCache *blockchaininfo.StateCache) *blockchaininfo.HistoryJournal { + var historyJorunal blockchaininfo.HistoryJournal + blockIDs := []string{blockID1, BlockID2, BlockID3, BlockID4, BlockID5} + for i := 1; i <= 5; i++ { + if i == checkedBlockNumber { + integerEntries, blockInfo := fillThirdCheckedBlock(t) + historyEntry := blockchaininfo.HistoryEntry{ + Height: blockInfo.Height, + BlockID: blockInfo.BlockID, + Entries: integerEntries, + } + historyJorunal.Push(historyEntry) + } + + var integerEntries []proto.DataEntry + blockID, err := proto.NewBlockIDFromBase58(blockIDs[i-1]) + + for j := 1; j <= i; j++ { + integerDataEntry := &proto.IntegerDataEntry{ + Key: strconv.Itoa(j), + Value: int64(j), + } + assert.NoError(t, err) + integerEntries = append(integerEntries, integerDataEntry) + } + historyEntry := blockchaininfo.HistoryEntry{ + Height: uint64(i), + BlockID: blockID, + Entries: integerEntries, + } + historyJorunal.Push(historyEntry) + continue + } + historyJorunal.StateCache = stateCache + return &historyJorunal +} + +func fillCache(t *testing.T) *blockchaininfo.StateCache { + stateCache := blockchaininfo.NewStateCache() + blockIDs := []string{blockID1, BlockID2, BlockID3, BlockID4, BlockID5} + for i := 1; i <= 5; i++ { + if i == checkedBlockNumber { + integerEntries, blockInfo := fillThirdCheckedBlock(t) + stateCache.AddCacheRecord(blockInfo.Height, integerEntries, blockInfo) + continue + } + + var integerEntries []proto.DataEntry + blockID, err := proto.NewBlockIDFromBase58(blockIDs[i-1]) + require.NoError(t, err) + for j := 1; j <= i; j++ { + integerDataEntry := &proto.IntegerDataEntry{ + Key: strconv.Itoa(j), + Value: int64(j), + } + integerEntries = append(integerEntries, integerDataEntry) + } + blockInfo := proto.BlockUpdatesInfo{ + Height: uint64(i), + BlockID: blockID, + } + stateCache.AddCacheRecord(uint64(i), integerEntries, blockInfo) + } + return stateCache +} + +// Rollback from block 5 to block 3. +// On block 3, keys "1", "2", "3" had negative values, so the patch should generate the negative +// values only found on that block. +func TestRollback(t *testing.T) { + mockPublisherInterface := mocks.NewMockUpdatesPublisherInterface(t) + mockPublisherInterface.EXPECT().PublishUpdates(mock.Anything, mock.Anything, proto.TestNetScheme, + l2ContractAddress).Return(nil) + mockPublisherInterface.EXPECT().L2ContractAddress().Return(l2ContractAddress) + + blockID6, err := proto.NewBlockIDFromBase58(BlockID6) + assert.NoError(t, err) + currentState := proto.BUpdatesInfo{ + BlockUpdatesInfo: proto.BlockUpdatesInfo{ + Height: 6, + BlockID: blockID6, + }, + ContractUpdatesInfo: proto.L2ContractDataEntries{ + Height: 6, + AllDataEntries: []proto.DataEntry{&proto.IntegerDataEntry{ + Key: "5", + Value: 6, + }}, + }, + } + blockID5, err := proto.NewBlockIDFromBase58(BlockID5) + assert.NoError(t, err) + previousState := proto.BUpdatesInfo{ + BlockUpdatesInfo: proto.BlockUpdatesInfo{ + Height: 5, + BlockID: blockID5, + }, + ContractUpdatesInfo: proto.L2ContractDataEntries{ + Height: 5, + AllDataEntries: []proto.DataEntry{&proto.IntegerDataEntry{ + Key: "5", + Value: 5, + }}, + }, + } + blockID4, err := proto.NewBlockIDFromBase58(BlockID4) + assert.NoError(t, err) + updates := proto.BUpdatesInfo{ + BlockUpdatesInfo: proto.BlockUpdatesInfo{ + Height: 4, + BlockID: blockID4, + }, + ContractUpdatesInfo: proto.L2ContractDataEntries{ + Height: 4, + AllDataEntries: []proto.DataEntry{&proto.IntegerDataEntry{ + Key: "4", + Value: 4, + }}, + }, + } + + updatesExtensionState := &blockchaininfo.BUpdatesExtensionState{ + CurrentState: ¤tState, + PreviousState: &previousState, + Limit: 100, + Scheme: proto.TestNetScheme, + L2ContractAddress: l2ContractAddress, + HistoryJournal: fillHistoryJournal(t, fillCache(t)), + St: nil, + } + + // Rollback from block 5 to 3 + patch := blockchaininfo.HandleRollback(updatesExtensionState, updates, mockPublisherInterface, + nil, proto.TestNetScheme) + + expectedPatchEntries := []proto.DataEntry{ + &proto.IntegerDataEntry{ + Key: "1", + Value: -1, + }, + &proto.IntegerDataEntry{ + Key: "2", + Value: -2, + }, + &proto.IntegerDataEntry{ + Key: "3", + Value: -3, + }, + &proto.DeleteDataEntry{Key: "4"}, + &proto.DeleteDataEntry{Key: "5"}, + } + expectedL2Patch := proto.L2ContractDataEntries{ + AllDataEntries: expectedPatchEntries, + Height: 3, + } + + sort.Sort(patch.ContractUpdatesInfo.AllDataEntries) + sort.Sort(expectedL2Patch.AllDataEntries) + + assert.Equal(t, patch.ContractUpdatesInfo.AllDataEntries, expectedL2Patch.AllDataEntries) + assert.Equal(t, patch.ContractUpdatesInfo.Height, expectedL2Patch.Height) +} diff --git a/pkg/blockchaininfo/bupdates.go b/pkg/blockchaininfo/bupdates.go index 2bde26720..9b34e8caf 100644 --- a/pkg/blockchaininfo/bupdates.go +++ b/pkg/blockchaininfo/bupdates.go @@ -2,95 +2,63 @@ package blockchaininfo import ( "context" - "time" + "sync" "github.com/wavesplatform/gowaves/pkg/proto" - "go.uber.org/zap" ) -const ChannelWriteTimeout = 10 * time.Second - type BlockchainUpdatesExtension struct { - ctx context.Context - enableBlockchainUpdatesPlugin bool - l2ContractAddress proto.WavesAddress - bUpdatesChannel chan<- BUpdatesInfo - l2RequestsChannel <-chan L2Requests - firstBlock bool - blockchainExtensionState *BUpdatesExtensionState + ctx context.Context + l2ContractAddress proto.WavesAddress + BUpdatesChannel chan proto.BUpdatesInfo + firstBlock *bool + blockchainExtensionState *BUpdatesExtensionState + Lock sync.Mutex + extensionReady chan<- struct{} } func NewBlockchainUpdatesExtension( ctx context.Context, l2ContractAddress proto.WavesAddress, - bUpdatesChannel chan<- BUpdatesInfo, - requestChannel <-chan L2Requests, + bUpdatesChannel chan proto.BUpdatesInfo, blockchainExtensionState *BUpdatesExtensionState, + firstBlock *bool, + extensionReady chan<- struct{}, ) *BlockchainUpdatesExtension { return &BlockchainUpdatesExtension{ - ctx: ctx, - enableBlockchainUpdatesPlugin: true, - l2ContractAddress: l2ContractAddress, - bUpdatesChannel: bUpdatesChannel, - l2RequestsChannel: requestChannel, - firstBlock: true, - blockchainExtensionState: blockchainExtensionState, + ctx: ctx, + l2ContractAddress: l2ContractAddress, + BUpdatesChannel: bUpdatesChannel, + firstBlock: firstBlock, + blockchainExtensionState: blockchainExtensionState, + extensionReady: extensionReady, } } -func (e *BlockchainUpdatesExtension) EnableBlockchainUpdatesPlugin() bool { - return e != nil && e.enableBlockchainUpdatesPlugin -} - func (e *BlockchainUpdatesExtension) L2ContractAddress() proto.WavesAddress { return e.l2ContractAddress } -func (e *BlockchainUpdatesExtension) IsFirstRequestedBlock() bool { - return e.firstBlock -} - -func (e *BlockchainUpdatesExtension) FirstBlockDone() { - e.firstBlock = false +func (e *BlockchainUpdatesExtension) MarkExtensionReady() { + e.Lock.Lock() + defer e.Lock.Unlock() + e.extensionReady <- struct{}{} } -func (e *BlockchainUpdatesExtension) ReceiveSignals() { - for { - select { - case <-e.ctx.Done(): - return - case l2Request, ok := <-e.l2RequestsChannel: - if !ok { - zap.S().Errorf("can't read from l2RequestsChannel, the channel is closed") - return - } - if l2Request.Restart { - e.firstBlock = true - e.blockchainExtensionState.previousState = nil - } - } - } +func (e *BlockchainUpdatesExtension) IsFirstRequestedBlock() bool { + return *e.firstBlock } -func (e *BlockchainUpdatesExtension) WriteBUpdates(bUpdates BUpdatesInfo) { - if e.bUpdatesChannel == nil { - return - } - select { - case e.bUpdatesChannel <- bUpdates: - case <-time.After(ChannelWriteTimeout): - zap.S().Errorf("failed to write into the blockchain updates channel, out of time") - return - case <-e.ctx.Done(): - e.close() - return - } +func (e *BlockchainUpdatesExtension) EmptyPreviousState() { + e.Lock.Lock() + *e.firstBlock = true + e.blockchainExtensionState.PreviousState = nil + defer e.Lock.Unlock() } -func (e *BlockchainUpdatesExtension) close() { - if e.bUpdatesChannel == nil { - return +func (e *BlockchainUpdatesExtension) Close() { + if e.BUpdatesChannel != nil { + close(e.BUpdatesChannel) } - close(e.bUpdatesChannel) - e.bUpdatesChannel = nil + e.BUpdatesChannel = nil } diff --git a/pkg/blockchaininfo/nats_publisher.go b/pkg/blockchaininfo/nats_publisher.go index d4aed0374..ea8f22b72 100644 --- a/pkg/blockchaininfo/nats_publisher.go +++ b/pkg/blockchaininfo/nats_publisher.go @@ -2,6 +2,8 @@ package blockchaininfo import ( "context" + "strings" + "sync" "time" "go.uber.org/zap" @@ -10,10 +12,11 @@ import ( "github.com/nats-io/nats.go" "github.com/pkg/errors" "github.com/wavesplatform/gowaves/pkg/proto" + "github.com/wavesplatform/gowaves/pkg/state" ) const StoreBlocksLimit = 200 -const ConnectionsTimeoutDefault = 5 * server.AUTH_TIMEOUT +const ConnectionsTimeoutDefault = 10 * server.AUTH_TIMEOUT const portDefault = 4222 const hostDefault = "127.0.0.1" @@ -28,7 +31,8 @@ const ( const L2RequestsTopic = "l2_requests_topic" const ( - RequestRestartSubTopic = "restart" + RequestRestartSubTopic = "restart" + RequestConstantKeysSubTopic = "constant_keys" ) func ConcatenateContractTopics(contractAddress string) string { @@ -36,30 +40,87 @@ func ConcatenateContractTopics(contractAddress string) string { } type BUpdatesExtensionState struct { - currentState *BUpdatesInfo - previousState *BUpdatesInfo // this information is what was just published - Limit uint64 - scheme proto.Scheme + CurrentState *proto.BUpdatesInfo + PreviousState *proto.BUpdatesInfo // this information is what was just published + Limit uint64 + Scheme proto.Scheme + constantContractKeys []string + L2ContractAddress string + HistoryJournal *HistoryJournal + St state.State +} + +type UpdatesPublisher struct { l2ContractAddress string } -func NewBUpdatesExtensionState(limit uint64, scheme proto.Scheme, l2ContractAddress string) *BUpdatesExtensionState { - return &BUpdatesExtensionState{Limit: limit, scheme: scheme, l2ContractAddress: l2ContractAddress} +func NewBUpdatesExtensionState(limit uint64, scheme proto.Scheme, l2ContractAddress string, + st state.State) (*BUpdatesExtensionState, error) { + stateCache := NewStateCache() + currentHeight, err := st.Height() + if err != nil { + return nil, err + } + l2address, cnvrtErr := proto.NewAddressFromString(l2ContractAddress) + if cnvrtErr != nil { + return nil, errors.Wrapf(cnvrtErr, "failed to convert L2 contract address %s", l2ContractAddress) + } + historyJournal := NewHistoryJournal() + for targetHeight := currentHeight - HistoryJournalLengthMax; targetHeight < currentHeight; targetHeight++ { + blockSnapshot, retrieveErr := st.SnapshotsAtHeight(targetHeight) + if retrieveErr != nil { + return nil, retrieveErr + } + blockInfo, pullErr := st.NewestBlockInfoByHeight(targetHeight) + if pullErr != nil { + return nil, errors.Wrap(pullErr, "failed to get newest block info") + } + blockHeader, blockErr := st.NewestHeaderByHeight(targetHeight) + if blockErr != nil { + return nil, errors.Wrap(blockErr, "failed to get newest block info") + } + bUpdatesInfo := state.BuildBlockUpdatesInfoFromSnapshot(blockInfo, blockHeader, blockSnapshot, l2address) + + filteredDataEntries, filtrErr := filterDataEntries(targetHeight-limit, + bUpdatesInfo.ContractUpdatesInfo.AllDataEntries) + if filtrErr != nil { + return nil, errors.Wrap(filtrErr, "failed to initialize state cache, failed to filter data entries") + } + + stateCache.AddCacheRecord(targetHeight, filteredDataEntries, bUpdatesInfo.BlockUpdatesInfo) + + historyEntry := HistoryEntry{ + Height: targetHeight, + BlockID: bUpdatesInfo.BlockUpdatesInfo.BlockID, + Entries: filteredDataEntries, + VRF: bUpdatesInfo.BlockUpdatesInfo.VRF, + BlockHeader: bUpdatesInfo.BlockUpdatesInfo.BlockHeader, + } + historyJournal.Push(historyEntry) + } + historyJournal.StateCache = stateCache + + return &BUpdatesExtensionState{Limit: limit, Scheme: scheme, + L2ContractAddress: l2ContractAddress, HistoryJournal: historyJournal, St: st}, nil +} + +func (bu *BUpdatesExtensionState) SetPreviousState(updates proto.BUpdatesInfo) { + bu.PreviousState = &updates } -func (bu *BUpdatesExtensionState) hasStateChanged() (bool, BUpdatesInfo, error) { - statesAreEqual, changes, err := bu.statesEqual(bu.scheme) +func (bu *BUpdatesExtensionState) HasStateChanged() (bool, proto.BUpdatesInfo, error) { + statesAreEqual, changes, err := bu.StatesEqual(bu.Scheme) if err != nil { - return false, BUpdatesInfo{}, err + return false, proto.BUpdatesInfo{}, err } if statesAreEqual { - return false, BUpdatesInfo{}, nil + return false, proto.BUpdatesInfo{}, nil } return true, changes, nil } -func (bu *BUpdatesExtensionState) statesEqual(scheme proto.Scheme) (bool, BUpdatesInfo, error) { - return CompareBUpdatesInfo(*bu.currentState, *bu.previousState, scheme) +func (bu *BUpdatesExtensionState) StatesEqual(scheme proto.Scheme) (bool, proto.BUpdatesInfo, error) { + return CompareBUpdatesInfo(*bu.CurrentState, *bu.PreviousState, scheme) } func splitIntoChunks(array []byte, maxChunkSize int) [][]byte { @@ -79,7 +140,8 @@ func splitIntoChunks(array []byte, maxChunkSize int) [][]byte { return chunkedArray } -func (bu *BUpdatesExtensionState) publishContractUpdates(contractUpdates L2ContractDataEntries, nc *nats.Conn) error { +func PublishContractUpdates(contractUpdates proto.L2ContractDataEntries, nc *nats.Conn, + l2ContractAddress string) error { dataEntriesProtobuf, err := L2ContractDataEntriesToProto(contractUpdates).MarshalVTStrict() if err != nil { return err @@ -89,12 +151,12 @@ func (bu *BUpdatesExtensionState) publishContractUpdates(contractUpdates L2Contr var msg []byte msg = append(msg, NoPaging) msg = append(msg, dataEntriesProtobuf...) - err = nc.Publish(ConcatenateContractTopics(bu.l2ContractAddress), msg) + err = nc.Publish(ConcatenateContractTopics(l2ContractAddress), msg) if err != nil { - zap.S().Errorf("failed to publish message on topic %s", ConcatenateContractTopics(bu.l2ContractAddress)) + zap.S().Errorf("failed to publish message on topic %s", ConcatenateContractTopics(l2ContractAddress)) return err } - zap.S().Infof("Published on topic: %s\n", ConcatenateContractTopics(bu.l2ContractAddress)) + zap.S().Infof("Published on topic: %s\n", ConcatenateContractTopics(l2ContractAddress)) return nil } @@ -106,29 +168,29 @@ func (bu *BUpdatesExtensionState) publishContractUpdates(contractUpdates L2Contr if i == len(chunkedPayload)-1 { msg = append(msg, EndPaging) msg = append(msg, chunk...) - err = nc.Publish(ConcatenateContractTopics(bu.l2ContractAddress), msg) + err = nc.Publish(ConcatenateContractTopics(l2ContractAddress), msg) if err != nil { - zap.S().Errorf("failed to publish message on topic %s", ConcatenateContractTopics(bu.l2ContractAddress)) + zap.S().Errorf("failed to publish message on topic %s", ConcatenateContractTopics(l2ContractAddress)) return err } - zap.S().Infof("Published on topic: %s\n", ConcatenateContractTopics(bu.l2ContractAddress)) + zap.S().Infof("Published on topic: %s\n", ConcatenateContractTopics(l2ContractAddress)) break } msg = append(msg, StartPaging) msg = append(msg, chunk...) - err = nc.Publish(ConcatenateContractTopics(bu.l2ContractAddress), msg) + err = nc.Publish(ConcatenateContractTopics(l2ContractAddress), msg) if err != nil { - zap.S().Errorf("failed to publish message on topic %s", ConcatenateContractTopics(bu.l2ContractAddress)) + zap.S().Errorf("failed to publish message on topic %s", ConcatenateContractTopics(l2ContractAddress)) return err } - zap.S().Infof("Published on topic: %s\n", ConcatenateContractTopics(bu.l2ContractAddress)) + zap.S().Infof("Published on topic: %s\n", ConcatenateContractTopics(l2ContractAddress)) time.Sleep(publisherWaitingTime) } return nil } -func (bu *BUpdatesExtensionState) publishBlockUpdates(updates BUpdatesInfo, nc *nats.Conn, scheme proto.Scheme) error { +func PublishBlockUpdates(updates proto.BUpdatesInfo, nc *nats.Conn, scheme proto.Scheme) error { blockInfo, err := BUpdatesInfoToProto(updates, scheme) if err != nil { return err @@ -146,9 +208,10 @@ func (bu *BUpdatesExtensionState) publishBlockUpdates(updates BUpdatesInfo, nc * return nil } -func (bu *BUpdatesExtensionState) publishUpdates(updates BUpdatesInfo, nc *nats.Conn, scheme proto.Scheme) error { +func (p *UpdatesPublisher) PublishUpdates(updates proto.BUpdatesInfo, + nc *nats.Conn, scheme proto.Scheme, l2ContractAddress string) error { /* first publish block data */ - err := bu.publishBlockUpdates(updates, nc, scheme) + err := PublishBlockUpdates(updates, nc, scheme) if err != nil { zap.S().Errorf("failed to publish message on topic %s", BlockUpdates) return err @@ -156,82 +219,235 @@ func (bu *BUpdatesExtensionState) publishUpdates(updates BUpdatesInfo, nc *nats. /* second publish contract data entries */ if updates.ContractUpdatesInfo.AllDataEntries != nil { - pblshErr := bu.publishContractUpdates(updates.ContractUpdatesInfo, nc) + pblshErr := PublishContractUpdates(updates.ContractUpdatesInfo, nc, l2ContractAddress) if pblshErr != nil { - zap.S().Errorf("failed to publish message on topic %s", ConcatenateContractTopics(bu.l2ContractAddress)) + zap.S().Errorf("failed to publish message on topic %s", ConcatenateContractTopics(p.L2ContractAddress())) return pblshErr } - zap.S().Infof("Published on topic: %s\n", ConcatenateContractTopics(bu.l2ContractAddress)) + zap.S().Infof("Published on topic: %s\n", ConcatenateContractTopics(p.L2ContractAddress())) + } + + return nil +} + +func (p *UpdatesPublisher) L2ContractAddress() string { + return p.l2ContractAddress +} + +func (bu *BUpdatesExtensionState) AddEntriesToHistoryJournalAndCache(updates proto.BUpdatesInfo) { + height := updates.BlockUpdatesInfo.Height + blockID := updates.BlockUpdatesInfo.BlockID + + historyEntry := HistoryEntry{ + Height: height, + BlockID: blockID, + Entries: updates.ContractUpdatesInfo.AllDataEntries, + VRF: updates.BlockUpdatesInfo.VRF, + BlockHeader: updates.BlockUpdatesInfo.BlockHeader, + } + bu.HistoryJournal.Push(historyEntry) + bu.HistoryJournal.StateCache.AddCacheRecord(height, updates.ContractUpdatesInfo.AllDataEntries, + updates.BlockUpdatesInfo) +} + +func (bu *BUpdatesExtensionState) RollbackHappened(updates proto.BUpdatesInfo, previousState proto.BUpdatesInfo) bool { + if _, _, blockIDFound := bu.HistoryJournal.SearchByBlockID(updates.BlockUpdatesInfo.BlockHeader.Parent); blockIDFound { + return false + } + if updates.BlockUpdatesInfo.Height < previousState.BlockUpdatesInfo.Height { + return true + } + return false +} + +func (bu *BUpdatesExtensionState) GeneratePatch(latestUpdates proto.BUpdatesInfo) (proto.BUpdatesInfo, error) { + keysForAnalysis, found := bu.HistoryJournal.FetchKeysUntilBlockID(latestUpdates.BlockUpdatesInfo.BlockID) + if !found { + previousHeight := bu.PreviousState.BlockUpdatesInfo.Height + newHeight := latestUpdates.BlockUpdatesInfo.Height + return proto.BUpdatesInfo{}, errors.Errorf("failed to fetch keys after rollback, the rollback is too deep."+ + "Previous height %d, new height %d", previousHeight, newHeight) + } + + // If the key is found in the state, fetch it. If it is not found, it creates a DeleteEntry. + patchDataEntries, patchBlockInfo, err := bu.BuildPatch(keysForAnalysis, + latestUpdates.BlockUpdatesInfo.Height-1) // Height. + // -1 because the current height is from the new block updates, + // and we need to return the client's state to the previous block + if err != nil { + return proto.BUpdatesInfo{}, err + } + + // Clean the journal and cache after rollback + historyJournalLatestHeight, err := bu.HistoryJournal.TopHeight() + if err != nil { + return proto.BUpdatesInfo{}, err + } + err = bu.CleanRecordsAfterRollback(historyJournalLatestHeight, latestUpdates.BlockUpdatesInfo.Height) + if err != nil { + return proto.BUpdatesInfo{}, err + } + + patch := proto.BUpdatesInfo{ + BlockUpdatesInfo: patchBlockInfo, // wrong, must be the previous block + ContractUpdatesInfo: proto.L2ContractDataEntries{ + AllDataEntries: patchDataEntries, + Height: latestUpdates.ContractUpdatesInfo.Height - 1, + }, + } + return patch, nil +} + +func (bu *BUpdatesExtensionState) IsKeyConstant(keyDataEntry string) bool { + for _, constantKey := range bu.constantContractKeys { + if constantKey == keyDataEntry { + return true + } + } + return false +} + +func (bu *BUpdatesExtensionState) BuildPatch(keysForPatch []string, targetHeight uint64) (proto.DataEntries, + proto.BlockUpdatesInfo, error) { + l2WavesAddress, cnvrtErr := proto.NewAddressFromString(bu.L2ContractAddress) + if cnvrtErr != nil { + return nil, proto.BlockUpdatesInfo{}, errors.Wrapf(cnvrtErr, + "failed to convert L2 contract address %q", bu.L2ContractAddress) + } + patch := make(map[string]proto.DataEntry) + for _, dataEntryKey := range keysForPatch { + recipient := proto.NewRecipientFromAddress(l2WavesAddress) + dataEntry, ok, err := bu.HistoryJournal.StateCache.SearchValue(dataEntryKey, targetHeight) + if err != nil { + // If the key is constant, we will go to State, if not, consider it a DeleteDataEntry + if bu.IsKeyConstant(dataEntryKey) { + dataEntry, err = bu.St.RetrieveEntry(recipient, dataEntryKey) + if err != nil { + dataEntry = &proto.DeleteDataEntry{Key: dataEntryKey} + } + } else { + dataEntry = &proto.DeleteDataEntry{Key: dataEntryKey} + } + } + if !ok { + dataEntry = &proto.DeleteDataEntry{Key: dataEntryKey} + } + patch[dataEntry.GetKey()] = dataEntry + } + var patchArray []proto.DataEntry + for _, elem := range patch { + patchArray = append(patchArray, elem) + } + blockInfo, err := bu.HistoryJournal.StateCache.SearchBlockInfo(targetHeight) + if err != nil { + return nil, proto.BlockUpdatesInfo{}, err + } + return patchArray, blockInfo, nil +} + +func (bu *BUpdatesExtensionState) CleanRecordsAfterRollback(latestHeightFromHistory uint64, + heightAfterRollback uint64) error { + err := bu.HistoryJournal.CleanAfterRollback(latestHeightFromHistory, heightAfterRollback) + if err != nil { + return err } + // This should never happen + if latestHeightFromHistory < heightAfterRollback { + return errors.New("the height after rollback is bigger than the last saved height") + } + for i := latestHeightFromHistory; i >= heightAfterRollback; i-- { + bu.HistoryJournal.StateCache.RemoveCacheRecord(i) + } return nil } -func handleBlockchainUpdate(updates BUpdatesInfo, bu *BUpdatesExtensionState, scheme proto.Scheme, nc *nats.Conn) { +func HandleRollback(be *BUpdatesExtensionState, updates proto.BUpdatesInfo, updatesPublisher UpdatesPublisherInterface, + nc *nats.Conn, scheme proto.Scheme) proto.BUpdatesInfo { + patch, err := be.GeneratePatch(updates) + if err != nil { + zap.S().Errorf("failed to generate a patch, %v", err) + } + pblshErr := updatesPublisher.PublishUpdates(patch, nc, scheme, updatesPublisher.L2ContractAddress()) + if pblshErr != nil { + zap.S().Errorf("failed to publish updates, %v", pblshErr) + return proto.BUpdatesInfo{} + } + be.AddEntriesToHistoryJournalAndCache(patch) + be.SetPreviousState(patch) + return patch +} + +func handleBlockchainUpdate(updates proto.BUpdatesInfo, be *BUpdatesExtensionState, scheme proto.Scheme, nc *nats.Conn, + updatesPublisher UpdatesPublisher, handleRollback bool) { // update current state - bu.currentState = &updates - if bu.previousState == nil { + be.CurrentState = &updates + if be.PreviousState == nil { // publish initial updates - filteredDataEntries, err := filterDataEntries(updates.BlockUpdatesInfo.Height-bu.Limit, + filteredDataEntries, err := filterDataEntries(updates.BlockUpdatesInfo.Height-be.Limit, updates.ContractUpdatesInfo.AllDataEntries) if err != nil { return } updates.ContractUpdatesInfo.AllDataEntries = filteredDataEntries - pblshErr := bu.publishUpdates(updates, nc, scheme) + pblshErr := updatesPublisher.PublishUpdates(updates, nc, scheme, updatesPublisher.L2ContractAddress()) if pblshErr != nil { zap.S().Errorf("failed to publish updates, %v", pblshErr) return } - bu.previousState = &updates + be.PreviousState = &updates return } + if handleRollback { + if be.RollbackHappened(updates, *be.PreviousState) { + HandleRollback(be, updates, &updatesPublisher, nc, scheme) + } + } // compare the current state to the previous state - stateChanged, changes, cmprErr := bu.hasStateChanged() + stateChanged, changes, cmprErr := be.HasStateChanged() if cmprErr != nil { zap.S().Errorf("failed to compare current and previous states, %v", cmprErr) return } // if there is any diff, send the update if stateChanged { - pblshErr := bu.publishUpdates(changes, nc, scheme) + pblshErr := updatesPublisher.PublishUpdates(updates, nc, scheme, updatesPublisher.L2ContractAddress()) if pblshErr != nil { zap.S().Errorf("failed to publish changes, %v", pblshErr) } - bu.previousState = &updates + be.AddEntriesToHistoryJournalAndCache(changes) + be.PreviousState = &updates } } -func runPublisher(ctx context.Context, updatesChannel <-chan BUpdatesInfo, - bu *BUpdatesExtensionState, scheme proto.Scheme, nc *nats.Conn) { +func runPublisher(ctx context.Context, extension *BlockchainUpdatesExtension, scheme proto.Scheme, nc *nats.Conn, + updatesPublisher UpdatesPublisher) { for { select { - case updates, ok := <-updatesChannel: + case updates, ok := <-extension.BUpdatesChannel: if !ok { zap.S().Errorf("the updates channel for publisher was closed") return } - handleBlockchainUpdate(updates, bu, scheme, nc) + handleBlockchainUpdate(updates, extension.blockchainExtensionState, scheme, nc, updatesPublisher, true) case <-ctx.Done(): return } } } -func runReceiver(requestsChannel chan<- L2Requests, nc *nats.Conn) error { +func runReceiver(nc *nats.Conn, bu *BlockchainUpdatesExtension) error { _, subErr := nc.Subscribe(L2RequestsTopic, func(request *nats.Msg) { signal := string(request.Data) switch signal { case RequestRestartSubTopic: - l2Request := L2Requests{Restart: true} - requestsChannel <- l2Request - notNilResponse := "ok" err := request.Respond([]byte(notNilResponse)) if err != nil { zap.S().Errorf("failed to respond to a restart signal, %v", err) return } + bu.EmptyPreviousState() default: zap.S().Errorf("nats receiver received an unknown signal, %s", signal) } @@ -239,8 +455,8 @@ func runReceiver(requestsChannel chan<- L2Requests, nc *nats.Conn) error { return subErr } -func (bu *BUpdatesExtensionState) RunBlockchainUpdatesPublisher(ctx context.Context, - updatesChannel <-chan BUpdatesInfo, scheme proto.Scheme, l2Requests chan<- L2Requests) error { +func (e *BlockchainUpdatesExtension) RunBlockchainUpdatesPublisher(ctx context.Context, + scheme proto.Scheme) error { opts := &server.Options{ MaxPayload: natsMaxPayloadSize, Host: hostDefault, @@ -268,10 +484,50 @@ func (bu *BUpdatesExtensionState) RunBlockchainUpdatesPublisher(ctx context.Cont } defer nc.Close() - receiverErr := runReceiver(l2Requests, nc) + var wg sync.WaitGroup + wg.Add(1) + reqErr := e.requestConstantKeys(nc, &wg) + if reqErr != nil { + return errors.Wrap(reqErr, "failed to request constant keys from the client") + } + wg.Wait() + e.MarkExtensionReady() + updatesPublisher := UpdatesPublisher{l2ContractAddress: e.l2ContractAddress.String()} + // Publish the first 100 history entries for the rollback functionality. + publishHistoryBlocks(e, scheme, nc, updatesPublisher) + + receiverErr := runReceiver(nc, e) if receiverErr != nil { return receiverErr } - runPublisher(ctx, updatesChannel, bu, scheme, nc) + runPublisher(ctx, e, scheme, nc, updatesPublisher) return nil } + +func (e *BlockchainUpdatesExtension) requestConstantKeys(nc *nats.Conn, wg *sync.WaitGroup) error { + _, subErr := nc.Subscribe(ConstantKeys, func(msg *nats.Msg) { + defer wg.Done() + constantKeys := strings.Split(string(msg.Data), ",") + e.blockchainExtensionState.constantContractKeys = constantKeys + }) + return subErr +} + +func publishHistoryBlocks(e *BlockchainUpdatesExtension, scheme proto.Scheme, + nc *nats.Conn, updatesPublisher UpdatesPublisher) { + for _, historyEntry := range e.blockchainExtensionState.HistoryJournal.historyJournal { + updates := proto.BUpdatesInfo{ + BlockUpdatesInfo: proto.BlockUpdatesInfo{ + Height: historyEntry.Height, + BlockID: historyEntry.BlockID, + VRF: historyEntry.VRF, + BlockHeader: historyEntry.BlockHeader, + }, + ContractUpdatesInfo: proto.L2ContractDataEntries{ + Height: historyEntry.Height, + AllDataEntries: historyEntry.Entries, + }, + } + handleBlockchainUpdate(updates, e.blockchainExtensionState, scheme, nc, updatesPublisher, false) + } +} diff --git a/pkg/blockchaininfo/proto_converters.go b/pkg/blockchaininfo/proto_converters.go index 728b84042..9d9b47dfb 100644 --- a/pkg/blockchaininfo/proto_converters.go +++ b/pkg/blockchaininfo/proto_converters.go @@ -8,7 +8,7 @@ import ( "github.com/wavesplatform/gowaves/pkg/proto" ) -func BUpdatesInfoToProto(blockInfo BUpdatesInfo, scheme proto.Scheme) (*g.BlockInfo, error) { +func BUpdatesInfoToProto(blockInfo proto.BUpdatesInfo, scheme proto.Scheme) (*g.BlockInfo, error) { var ( blockHeader *waves.Block_Header err error @@ -26,22 +26,22 @@ func BUpdatesInfoToProto(blockInfo BUpdatesInfo, scheme proto.Scheme) (*g.BlockI }, nil } -func BUpdatesInfoFromProto(blockInfoProto *g.BlockInfo) (BlockUpdatesInfo, error) { +func BUpdatesInfoFromProto(blockInfoProto *g.BlockInfo) (proto.BlockUpdatesInfo, error) { if blockInfoProto == nil { - return BlockUpdatesInfo{}, errors.New("empty block info") + return proto.BlockUpdatesInfo{}, errors.New("empty block info") } blockID, err := proto.NewBlockIDFromBytes(blockInfoProto.BlockID) if err != nil { - return BlockUpdatesInfo{}, errors.Wrap(err, "failed to convert block ID") + return proto.BlockUpdatesInfo{}, errors.Wrap(err, "failed to convert block ID") } var c proto.ProtobufConverter blockHeader, err := c.PartialBlockHeader(blockInfoProto.BlockHeader) if err != nil { - return BlockUpdatesInfo{}, errors.Wrap(err, "failed to convert block header") + return proto.BlockUpdatesInfo{}, errors.Wrap(err, "failed to convert block header") } blockHeader.ID = blockID // Set block ID to the one from the protobuf. vrf := proto.B58Bytes(blockInfoProto.VRF) - return BlockUpdatesInfo{ + return proto.BlockUpdatesInfo{ Height: blockInfoProto.Height, VRF: vrf, BlockID: blockID, @@ -49,7 +49,7 @@ func BUpdatesInfoFromProto(blockInfoProto *g.BlockInfo) (BlockUpdatesInfo, error }, nil } -func L2ContractDataEntriesToProto(contractData L2ContractDataEntries) *g.L2ContractDataEntries { +func L2ContractDataEntriesToProto(contractData proto.L2ContractDataEntries) *g.L2ContractDataEntries { var protobufDataEntries []*waves.DataEntry for _, entry := range contractData.AllDataEntries { entryProto := entry.ToProtobuf() @@ -64,19 +64,19 @@ func L2ContractDataEntriesToProto(contractData L2ContractDataEntries) *g.L2Contr func L2ContractDataEntriesFromProto( protoDataEntries *g.L2ContractDataEntries, scheme proto.Scheme, -) (L2ContractDataEntries, error) { +) (proto.L2ContractDataEntries, error) { if protoDataEntries == nil { - return L2ContractDataEntries{}, errors.New("empty contract data") + return proto.L2ContractDataEntries{}, errors.New("empty contract data") } converter := proto.ProtobufConverter{FallbackChainID: scheme} dataEntries := make([]proto.DataEntry, 0, len(protoDataEntries.DataEntries)) for _, protoEntry := range protoDataEntries.DataEntries { entry, err := converter.Entry(protoEntry) if err != nil { - return L2ContractDataEntries{}, errors.Wrap(err, "failed to convert data entry") + return proto.L2ContractDataEntries{}, errors.Wrap(err, "failed to convert data entry") } dataEntries = append(dataEntries, entry) } - return L2ContractDataEntries{AllDataEntries: dataEntries, Height: protoDataEntries.Height}, nil + return proto.L2ContractDataEntries{AllDataEntries: dataEntries, Height: protoDataEntries.Height}, nil } diff --git a/pkg/blockchaininfo/signals.go b/pkg/blockchaininfo/signals.go index 103a3c68e..015c46091 100644 --- a/pkg/blockchaininfo/signals.go +++ b/pkg/blockchaininfo/signals.go @@ -1,6 +1,30 @@ package blockchaininfo -import "github.com/nats-io/nats.go" +import ( + "strings" + + "github.com/nats-io/nats.go" +) + +const ( + TokenID = "tokenId" + AllMiners = "allMiners" + LastChainID = "lastChainId" + MinerReward = "minerReward" + Chain0Height = "chain0Height" + ThisEpochData = "thisEpochData" +) + +func ConstantContractKeys() []string { + return []string{ + TokenID, + AllMiners, + LastChainID, + MinerReward, + Chain0Height, + ThisEpochData, + } +} func SendRestartSignal(nc *nats.Conn) (*nats.Msg, error) { message := []byte(RequestRestartSubTopic) @@ -10,3 +34,8 @@ func SendRestartSignal(nc *nats.Conn) (*nats.Msg, error) { } return msg, nil } + +func SendConstantKeys(nc *nats.Conn) error { + constantKeys := strings.Join(ConstantContractKeys(), ",") + return nc.Publish(ConstantKeys, []byte(constantKeys)) +} diff --git a/pkg/blockchaininfo/topics.go b/pkg/blockchaininfo/topics.go index d6b4fd01e..d039b709b 100644 --- a/pkg/blockchaininfo/topics.go +++ b/pkg/blockchaininfo/topics.go @@ -4,4 +4,6 @@ package blockchaininfo const ( BlockUpdates = "block_topic" ContractUpdates = "contract_topic" + + ConstantKeys = "constant_keys" ) diff --git a/pkg/blockchaininfo/types.go b/pkg/blockchaininfo/types.go index c21c75e06..64f709e7c 100644 --- a/pkg/blockchaininfo/types.go +++ b/pkg/blockchaininfo/types.go @@ -2,7 +2,11 @@ package blockchaininfo import ( "bytes" + "fmt" + "math" + "sync" + "github.com/nats-io/nats.go" "github.com/pkg/errors" "github.com/wavesplatform/gowaves/pkg/crypto" "github.com/wavesplatform/gowaves/pkg/proto" @@ -10,36 +14,239 @@ import ( const ( RootHashSize = 32 + + HistoryJournalLengthMax = 100 ) -// BlockUpdatesInfo Block updates. -type BlockUpdatesInfo struct { - Height uint64 `json:"height"` - VRF proto.B58Bytes `json:"vrf"` - BlockID proto.BlockID `json:"block_id"` - BlockHeader proto.BlockHeader `json:"block_header"` +type UpdatesPublisherInterface interface { + PublishUpdates(updates proto.BUpdatesInfo, + nc *nats.Conn, scheme proto.Scheme, l2ContractAddress string) error + L2ContractAddress() string +} + +type StateCacheRecord struct { + dataEntries map[string]proto.DataEntry + blockInfo proto.BlockUpdatesInfo +} + +func NewStateCacheRecord(dataEntries []proto.DataEntry, blockInfo proto.BlockUpdatesInfo) StateCacheRecord { + var stateCacheRecord StateCacheRecord + stateCacheRecord.dataEntries = make(map[string]proto.DataEntry) + + for _, dataEntry := range dataEntries { + stateCacheRecord.dataEntries[dataEntry.GetKey()] = dataEntry + } + stateCacheRecord.blockInfo = blockInfo + return stateCacheRecord +} + +type StateCache struct { + lock sync.Mutex + records map[proto.Height]StateCacheRecord + heights []uint64 +} + +func NewStateCache() *StateCache { + return &StateCache{ + records: make(map[proto.Height]StateCacheRecord), + } +} + +func (sc *StateCache) SearchValue(key string, height uint64) (proto.DataEntry, bool, error) { + sc.lock.Lock() + defer sc.lock.Unlock() + + if _, ok := sc.records[height]; !ok { + return nil, false, errors.New("the target height is not in cache") + } + if _, ok := sc.records[height].dataEntries[key]; !ok { + return nil, false, nil + } + return sc.records[height].dataEntries[key], true, nil +} + +func (sc *StateCache) SearchBlockInfo(height uint64) (proto.BlockUpdatesInfo, error) { + sc.lock.Lock() + defer sc.lock.Unlock() + + if _, ok := sc.records[height]; !ok { + return proto.BlockUpdatesInfo{}, errors.New("the target height is not in cache") + } + return sc.records[height].blockInfo, nil +} + +func (sc *StateCache) AddCacheRecord(height uint64, dataEntries []proto.DataEntry, blockInfo proto.BlockUpdatesInfo) { + sc.lock.Lock() + defer sc.lock.Unlock() + // clean the oldest record if the cache is too big + if len(sc.heights) > HistoryJournalLengthMax { + minHeight := sc.heights[0] + for _, v := range sc.heights { + if v < minHeight { + minHeight = v + } + } + delete(sc.records, minHeight) + } + stateCacheRecord := NewStateCacheRecord(dataEntries, blockInfo) + sc.records[height] = stateCacheRecord + sc.heights = append(sc.heights, height) +} + +func (sc *StateCache) RemoveCacheRecord(targetHeight uint64) { + sc.lock.Lock() + defer sc.lock.Unlock() + + delete(sc.records, targetHeight) + + for i, item := range sc.heights { + if item == targetHeight { + sc.heights = append(sc.heights[:i], sc.heights[i+1:]...) + } + } +} + +type HistoryEntry struct { + Height uint64 + BlockID proto.BlockID + VRF proto.B58Bytes + BlockHeader proto.BlockHeader + + Entries proto.DataEntries +} + +type HistoryJournal struct { + lock sync.Mutex + StateCache *StateCache + historyJournal [HistoryJournalLengthMax]HistoryEntry + top int + size int } -// L2ContractDataEntries L2 contract data entries. -type L2ContractDataEntries struct { - AllDataEntries []proto.DataEntry `json:"all_data_entries"` - Height uint64 `json:"height"` +// NewHistoryJournal создаёт и инициализирует новый экземпляр HistoryJournal. +func NewHistoryJournal() *HistoryJournal { + return &HistoryJournal{ + top: 0, + size: 0, + } +} + +func (hj *HistoryJournal) SetStateCache(stateCache *StateCache) { + hj.StateCache = stateCache } -type BUpdatesInfo struct { - BlockUpdatesInfo BlockUpdatesInfo - ContractUpdatesInfo L2ContractDataEntries +// FetchKeysUntilBlockID TODO write tests. +// FetchKeysUntilBlockID goes from top to bottom and fetches all keys. +// If the blockID is found, it returns the keys up to and including that element and true. +// If the blockID is not found - nil and false. +func (hj *HistoryJournal) FetchKeysUntilBlockID(blockID proto.BlockID) ([]string, bool) { + hj.lock.Lock() + defer hj.lock.Unlock() + + var keys []string + for i := 0; i < hj.size; i++ { + idx := (hj.top - 1 - i + HistoryJournalLengthMax) % HistoryJournalLengthMax + historyEntry := hj.historyJournal[idx] + + dataEntries := historyEntry.Entries + for _, dataEntry := range dataEntries { + keys = append(keys, dataEntry.GetKey()) + } + if historyEntry.BlockID == blockID { + return keys, true + } + } + + return nil, false +} + +// SearchByBlockID TODO write tests. +func (hj *HistoryJournal) SearchByBlockID(blockID proto.BlockID) (HistoryEntry, int, bool) { + hj.lock.Lock() + defer hj.lock.Unlock() + + // Iterate over the elements from the top (latest) to the bottom. + for i := 0; i < hj.size; i++ { + idx := (hj.top - 1 - i + HistoryJournalLengthMax) % HistoryJournalLengthMax + if hj.historyJournal[idx].BlockID == blockID { + return hj.historyJournal[idx], i, true + } + } + return HistoryEntry{}, -1, false +} + +// SearchByBlockID TODO write tests. +func (hj *HistoryJournal) TopHeight() (uint64, error) { + hj.lock.Lock() + defer hj.lock.Unlock() + + if hj.size == 0 { + return 0, errors.New("failed to pull the top height, history journal is empty") + } + + // Shift "top" back. + hj.top = (hj.top - 1 + HistoryJournalLengthMax) % HistoryJournalLengthMax + topHeight := hj.historyJournal[hj.top].Height + return topHeight, nil +} + +// CleanAfterRollback TODO write tests. +func (hj *HistoryJournal) CleanAfterRollback(latestHeightFromHistory uint64, heightAfterRollback uint64) error { + hj.lock.Lock() + defer hj.lock.Unlock() + + distance := latestHeightFromHistory - heightAfterRollback + if distance > math.MaxInt64 { + return fmt.Errorf("distance too large to fit in an int64") + } + dist := int64(distance) + + if int(dist) > hj.size { + return errors.New("distance out of range") + } + + // Remove the number of elements from the top to `distance`. + hj.top = (hj.top - int(dist) + HistoryJournalLengthMax) % HistoryJournalLengthMax + hj.size -= int(distance) + return nil +} + +func (hj *HistoryJournal) Push(v HistoryEntry) { + hj.lock.Lock() + defer hj.lock.Unlock() + hj.historyJournal[hj.top] = v // Add to top or rewrite the oldest element. + + hj.top = (hj.top + 1) % HistoryJournalLengthMax + + if hj.size < HistoryJournalLengthMax { + hj.size++ + } +} + +func (hj *HistoryJournal) Pop() (HistoryEntry, error) { + hj.lock.Lock() + defer hj.lock.Unlock() + + if hj.size == 0 { + return HistoryEntry{}, errors.New("failed to pop from the history journal, it's empty") + } + + // Shift "top" back. + hj.top = (hj.top - 1 + HistoryJournalLengthMax) % HistoryJournalLengthMax + entry := hj.historyJournal[hj.top] + hj.size-- + return entry, nil } type L2Requests struct { Restart bool } -func CompareBUpdatesInfo(current, previous BUpdatesInfo, - scheme proto.Scheme) (bool, BUpdatesInfo, error) { - changes := BUpdatesInfo{ - BlockUpdatesInfo: BlockUpdatesInfo{}, - ContractUpdatesInfo: L2ContractDataEntries{}, +func CompareBUpdatesInfo(current, previous proto.BUpdatesInfo, + scheme proto.Scheme) (bool, proto.BUpdatesInfo, error) { + changes := proto.BUpdatesInfo{ + BlockUpdatesInfo: proto.BlockUpdatesInfo{}, + ContractUpdatesInfo: proto.L2ContractDataEntries{}, } equal := true @@ -58,7 +265,7 @@ func CompareBUpdatesInfo(current, previous BUpdatesInfo, equalHeaders, err := compareBlockHeader(current.BlockUpdatesInfo.BlockHeader, previous.BlockUpdatesInfo.BlockHeader, scheme) if err != nil { - return false, BUpdatesInfo{}, err + return false, proto.BUpdatesInfo{}, err } if !equalHeaders { equal = false @@ -68,7 +275,7 @@ func CompareBUpdatesInfo(current, previous BUpdatesInfo, equalEntries, dataEntryChanges, err := compareDataEntries(current.ContractUpdatesInfo.AllDataEntries, previous.ContractUpdatesInfo.AllDataEntries) if err != nil { - return false, BUpdatesInfo{}, err + return false, proto.BUpdatesInfo{}, err } if !equalEntries { equal = false diff --git a/pkg/mock/blockchaininfo_types.go b/pkg/mock/blockchaininfo_types.go new file mode 100644 index 000000000..e46e70906 --- /dev/null +++ b/pkg/mock/blockchaininfo_types.go @@ -0,0 +1,131 @@ +// Code generated by mockery v2.50.1. DO NOT EDIT. + +package mock + +import ( + nats "github.com/nats-io/nats.go" + mock "github.com/stretchr/testify/mock" + + proto "github.com/wavesplatform/gowaves/pkg/proto" +) + +// MockUpdatesPublisherInterface is an autogenerated mock type for the UpdatesPublisherInterface type +type MockUpdatesPublisherInterface struct { + mock.Mock +} + +type MockUpdatesPublisherInterface_Expecter struct { + mock *mock.Mock +} + +func (_m *MockUpdatesPublisherInterface) EXPECT() *MockUpdatesPublisherInterface_Expecter { + return &MockUpdatesPublisherInterface_Expecter{mock: &_m.Mock} +} + +// L2ContractAddress provides a mock function with no fields +func (_m *MockUpdatesPublisherInterface) L2ContractAddress() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for L2ContractAddress") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockUpdatesPublisherInterface_L2ContractAddress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'L2ContractAddress' +type MockUpdatesPublisherInterface_L2ContractAddress_Call struct { + *mock.Call +} + +// L2ContractAddress is a helper method to define mock.On call +func (_e *MockUpdatesPublisherInterface_Expecter) L2ContractAddress() *MockUpdatesPublisherInterface_L2ContractAddress_Call { + return &MockUpdatesPublisherInterface_L2ContractAddress_Call{Call: _e.mock.On("L2ContractAddress")} +} + +func (_c *MockUpdatesPublisherInterface_L2ContractAddress_Call) Run(run func()) *MockUpdatesPublisherInterface_L2ContractAddress_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockUpdatesPublisherInterface_L2ContractAddress_Call) Return(_a0 string) *MockUpdatesPublisherInterface_L2ContractAddress_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockUpdatesPublisherInterface_L2ContractAddress_Call) RunAndReturn(run func() string) *MockUpdatesPublisherInterface_L2ContractAddress_Call { + _c.Call.Return(run) + return _c +} + +// PublishUpdates provides a mock function with given fields: updates, nc, scheme, l2ContractAddress +func (_m *MockUpdatesPublisherInterface) PublishUpdates(updates proto.BUpdatesInfo, nc *nats.Conn, scheme byte, l2ContractAddress string) error { + ret := _m.Called(updates, nc, scheme, l2ContractAddress) + + if len(ret) == 0 { + panic("no return value specified for PublishUpdates") + } + + var r0 error + if rf, ok := ret.Get(0).(func(proto.BUpdatesInfo, *nats.Conn, byte, string) error); ok { + r0 = rf(updates, nc, scheme, l2ContractAddress) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockUpdatesPublisherInterface_PublishUpdates_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PublishUpdates' +type MockUpdatesPublisherInterface_PublishUpdates_Call struct { + *mock.Call +} + +// PublishUpdates is a helper method to define mock.On call +// - updates proto.BUpdatesInfo +// - nc *nats.Conn +// - scheme byte +// - l2ContractAddress string +func (_e *MockUpdatesPublisherInterface_Expecter) PublishUpdates(updates interface{}, nc interface{}, scheme interface{}, l2ContractAddress interface{}) *MockUpdatesPublisherInterface_PublishUpdates_Call { + return &MockUpdatesPublisherInterface_PublishUpdates_Call{Call: _e.mock.On("PublishUpdates", updates, nc, scheme, l2ContractAddress)} +} + +func (_c *MockUpdatesPublisherInterface_PublishUpdates_Call) Run(run func(updates proto.BUpdatesInfo, nc *nats.Conn, scheme byte, l2ContractAddress string)) *MockUpdatesPublisherInterface_PublishUpdates_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(proto.BUpdatesInfo), args[1].(*nats.Conn), args[2].(byte), args[3].(string)) + }) + return _c +} + +func (_c *MockUpdatesPublisherInterface_PublishUpdates_Call) Return(_a0 error) *MockUpdatesPublisherInterface_PublishUpdates_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockUpdatesPublisherInterface_PublishUpdates_Call) RunAndReturn(run func(proto.BUpdatesInfo, *nats.Conn, byte, string) error) *MockUpdatesPublisherInterface_PublishUpdates_Call { + _c.Call.Return(run) + return _c +} + +// NewMockUpdatesPublisherInterface creates a new instance of MockUpdatesPublisherInterface. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockUpdatesPublisherInterface(t interface { + mock.TestingT + Cleanup(func()) +}) *MockUpdatesPublisherInterface { + mock := &MockUpdatesPublisherInterface{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/mock/state.go b/pkg/mock/state.go index ef14dc2b0..6aa4d2d99 100644 --- a/pkg/mock/state.go +++ b/pkg/mock/state.go @@ -688,6 +688,36 @@ func (mr *MockStateInfoMockRecorder) NewAddrTransactionsIterator(addr interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewAddrTransactionsIterator", reflect.TypeOf((*MockStateInfo)(nil).NewAddrTransactionsIterator), addr) } +// NewestBlockInfoByHeight mocks base method. +func (m *MockStateInfo) NewestBlockInfoByHeight(height proto.Height) (*proto.BlockInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewestBlockInfoByHeight", height) + ret0, _ := ret[0].(*proto.BlockInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewestBlockInfoByHeight indicates an expected call of NewestBlockInfoByHeight. +func (mr *MockStateInfoMockRecorder) NewestBlockInfoByHeight(height interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewestBlockInfoByHeight", reflect.TypeOf((*MockStateInfo)(nil).NewestBlockInfoByHeight), height) +} + +// NewestHeaderByHeight mocks base method. +func (m *MockStateInfo) NewestHeaderByHeight(height uint64) (*proto.BlockHeader, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewestHeaderByHeight", height) + ret0, _ := ret[0].(*proto.BlockHeader) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewestHeaderByHeight indicates an expected call of NewestHeaderByHeight. +func (mr *MockStateInfoMockRecorder) NewestHeaderByHeight(height interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewestHeaderByHeight", reflect.TypeOf((*MockStateInfo)(nil).NewestHeaderByHeight), height) +} + // NewestScriptByAccount mocks base method. func (m *MockStateInfo) NewestScriptByAccount(account proto.Recipient) (*ast.Tree, error) { m.ctrl.T.Helper() @@ -2091,6 +2121,36 @@ func (mr *MockStateMockRecorder) NewAddrTransactionsIterator(addr interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewAddrTransactionsIterator", reflect.TypeOf((*MockState)(nil).NewAddrTransactionsIterator), addr) } +// NewestBlockInfoByHeight mocks base method. +func (m *MockState) NewestBlockInfoByHeight(height proto.Height) (*proto.BlockInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewestBlockInfoByHeight", height) + ret0, _ := ret[0].(*proto.BlockInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewestBlockInfoByHeight indicates an expected call of NewestBlockInfoByHeight. +func (mr *MockStateMockRecorder) NewestBlockInfoByHeight(height interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewestBlockInfoByHeight", reflect.TypeOf((*MockState)(nil).NewestBlockInfoByHeight), height) +} + +// NewestHeaderByHeight mocks base method. +func (m *MockState) NewestHeaderByHeight(height uint64) (*proto.BlockHeader, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewestHeaderByHeight", height) + ret0, _ := ret[0].(*proto.BlockHeader) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewestHeaderByHeight indicates an expected call of NewestHeaderByHeight. +func (mr *MockStateMockRecorder) NewestHeaderByHeight(height interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewestHeaderByHeight", reflect.TypeOf((*MockState)(nil).NewestHeaderByHeight), height) +} + // NewestScriptByAccount mocks base method. func (m *MockState) NewestScriptByAccount(account proto.Recipient) (*ast.Tree, error) { m.ctrl.T.Helper() diff --git a/pkg/proto/blockchain_updates_types.go b/pkg/proto/blockchain_updates_types.go new file mode 100644 index 000000000..bfccc4437 --- /dev/null +++ b/pkg/proto/blockchain_updates_types.go @@ -0,0 +1,98 @@ +package proto + +import ( + "context" + "sync" + "time" + + "go.uber.org/zap" +) + +const ChannelWriteTimeout = 10 * time.Second + +type BUpdatesInfo struct { + BlockUpdatesInfo BlockUpdatesInfo + ContractUpdatesInfo L2ContractDataEntries +} + +// BlockUpdatesInfo Block updates. +type BlockUpdatesInfo struct { + Height uint64 `json:"height"` + VRF B58Bytes `json:"vrf"` + BlockID BlockID `json:"block_id"` + BlockHeader BlockHeader `json:"block_header"` +} + +// L2ContractDataEntries L2 contract data entries. +type L2ContractDataEntries struct { + AllDataEntries DataEntries `json:"all_data_entries"` + Height uint64 `json:"height"` +} + +type BlockchainUpdatesPluginInfo struct { + EnableBlockchainUpdatesPlugin bool + L2ContractAddress WavesAddress + FirstBlock *bool + Lock sync.Mutex + Ready bool + BUpdatesChannel chan<- BUpdatesInfo + ctx context.Context +} + +func NewBlockchainUpdatesPluginInfo(ctx context.Context, + l2Address WavesAddress, bUpdatesChannel chan<- BUpdatesInfo, + firstBlock *bool, + enableBlockchainUpdatesPlugin bool) *BlockchainUpdatesPluginInfo { + return &BlockchainUpdatesPluginInfo{ + L2ContractAddress: l2Address, + FirstBlock: firstBlock, + BUpdatesChannel: bUpdatesChannel, + ctx: ctx, + EnableBlockchainUpdatesPlugin: enableBlockchainUpdatesPlugin, + Ready: false, + } +} + +func (e *BlockchainUpdatesPluginInfo) IsReady() bool { + e.Lock.Lock() + defer e.Lock.Unlock() + return e.Ready +} + +func (e *BlockchainUpdatesPluginInfo) MakeExtensionReady() { + e.Lock.Lock() + defer e.Lock.Unlock() + e.Ready = true +} + +func (e *BlockchainUpdatesPluginInfo) Ctx() context.Context { + return e.ctx +} + +func (e *BlockchainUpdatesPluginInfo) FirstBlockDone() { + e.Lock.Lock() + defer e.Lock.Unlock() + *e.FirstBlock = false +} + +func (e *BlockchainUpdatesPluginInfo) WriteBUpdates(bUpdates BUpdatesInfo) { + if e.BUpdatesChannel == nil { + return + } + select { + case e.BUpdatesChannel <- bUpdates: + case <-time.After(ChannelWriteTimeout): + zap.S().Errorf("failed to write into the blockchain updates channel, out of time") + return + case <-e.ctx.Done(): + e.Close() + return + } +} + +func (e *BlockchainUpdatesPluginInfo) Close() { + if e.BUpdatesChannel != nil { + close(e.BUpdatesChannel) + } + e.BUpdatesChannel = nil +} diff --git a/pkg/state/api.go b/pkg/state/api.go index b861f9738..eec5fdd03 100644 --- a/pkg/state/api.go +++ b/pkg/state/api.go @@ -6,7 +6,6 @@ import ( "github.com/pkg/errors" - "github.com/wavesplatform/gowaves/pkg/blockchaininfo" "github.com/wavesplatform/gowaves/pkg/crypto" "github.com/wavesplatform/gowaves/pkg/keyvalue" "github.com/wavesplatform/gowaves/pkg/libs/ntptime" @@ -37,9 +36,11 @@ type StateInfo interface { TopBlock() *proto.Block Block(blockID proto.BlockID) (*proto.Block, error) BlockByHeight(height proto.Height) (*proto.Block, error) + NewestBlockInfoByHeight(height proto.Height) (*proto.BlockInfo, error) // Header getters. Header(blockID proto.BlockID) (*proto.BlockHeader, error) HeaderByHeight(height proto.Height) (*proto.BlockHeader, error) + NewestHeaderByHeight(height uint64) (*proto.BlockHeader, error) // Height returns current blockchain height. Height() (proto.Height, error) // Height <---> blockID converters. @@ -233,9 +234,9 @@ func NewState( params StateParams, settings *settings.BlockchainSettings, enableLightNode bool, - bUpdatesExtension *blockchaininfo.BlockchainUpdatesExtension, + bUpdatesPluginInfo *proto.BlockchainUpdatesPluginInfo, ) (State, error) { - s, err := newStateManager(dataDir, amend, params, settings, enableLightNode, bUpdatesExtension) + s, err := newStateManager(dataDir, amend, params, settings, enableLightNode, bUpdatesPluginInfo) if err != nil { return nil, errors.Wrap(err, "failed to create new state instance") } diff --git a/pkg/state/appender.go b/pkg/state/appender.go index 2fe651333..3c8e82112 100644 --- a/pkg/state/appender.go +++ b/pkg/state/appender.go @@ -8,8 +8,6 @@ import ( "github.com/pkg/errors" "go.uber.org/zap" - "github.com/wavesplatform/gowaves/pkg/blockchaininfo" - "github.com/wavesplatform/gowaves/pkg/crypto" "github.com/wavesplatform/gowaves/pkg/errs" "github.com/wavesplatform/gowaves/pkg/proto" @@ -58,7 +56,7 @@ type txAppender struct { // appending transactions. buildApiData bool - bUpdatesExtension *blockchaininfo.BlockchainUpdatesExtension + bUpdatesPluginInfo *proto.BlockchainUpdatesPluginInfo } func newTxAppender( @@ -69,7 +67,7 @@ func newTxAppender( stateDB *stateDB, atx *addressTransactions, snapshotApplier *blockSnapshotsApplier, - bUpdatesExtension *blockchaininfo.BlockchainUpdatesExtension, + bUpdatesPluginInfo *proto.BlockchainUpdatesPluginInfo, ) (*txAppender, error) { buildAPIData, err := stateDB.stateStoresApiData() if err != nil { @@ -103,22 +101,22 @@ func newTxAppender( ia := newInvokeApplier(state, sc, txHandler, stor, settings, blockDiffer, diffStorInvoke, diffApplier) ethKindResolver := proto.NewEthereumTransactionKindResolver(state, settings.AddressSchemeCharacter) return &txAppender{ - sc: sc, - ia: ia, - rw: rw, - blockInfoProvider: state, - atx: atx, - stor: stor, - settings: settings, - txHandler: txHandler, - blockDiffer: blockDiffer, - recentTxIds: make(map[string]struct{}), - diffStor: diffStor, - diffStorInvoke: diffStorInvoke, - diffApplier: diffApplier, - buildApiData: buildAPIData, - ethTxKindResolver: ethKindResolver, - bUpdatesExtension: bUpdatesExtension, + sc: sc, + ia: ia, + rw: rw, + blockInfoProvider: state, + atx: atx, + stor: stor, + settings: settings, + txHandler: txHandler, + blockDiffer: blockDiffer, + recentTxIds: make(map[string]struct{}), + diffStor: diffStor, + diffStorInvoke: diffStorInvoke, + diffApplier: diffApplier, + buildApiData: buildAPIData, + ethTxKindResolver: ethKindResolver, + bUpdatesPluginInfo: bUpdatesPluginInfo, }, nil } @@ -844,10 +842,12 @@ func (a *txAppender) appendBlock(params *appendBlockParams) error { } // write updates into the updatesChannel here - if a.bUpdatesExtension != nil && a.bUpdatesExtension.EnableBlockchainUpdatesPlugin() { - err = a.updateBlockchainUpdateInfo(blockInfo, params.block, blockSnapshot) - if err != nil { - return errors.Errorf("failed to request blockchain info from L2 smart contract state, %v", err) + if a.bUpdatesPluginInfo != nil && a.bUpdatesPluginInfo.EnableBlockchainUpdatesPlugin { + if a.bUpdatesPluginInfo.IsReady() { + err = a.updateBlockchainUpdateInfo(blockInfo, params.block, blockSnapshot) + if err != nil { + return errors.Errorf("failed to request blockchain info from L2 smart contract state, %v", err) + } } } @@ -874,44 +874,52 @@ func (a *txAppender) appendBlock(params *appendBlockParams) error { func (a *txAppender) updateBlockchainUpdateInfo(blockInfo *proto.BlockInfo, blockHeader *proto.BlockHeader, blockSnapshot proto.BlockSnapshot) error { + bUpdatesInfo := BuildBlockUpdatesInfoFromSnapshot(blockInfo, blockHeader, blockSnapshot, + a.bUpdatesPluginInfo.L2ContractAddress) + + if *a.bUpdatesPluginInfo.FirstBlock { + dataEntries, err := a.ia.state.RetrieveEntries(proto.NewRecipientFromAddress(a.bUpdatesPluginInfo.L2ContractAddress)) + if err != nil && !errors.Is(err, proto.ErrNotFound) { + return err + } + bUpdatesInfo.ContractUpdatesInfo.AllDataEntries = dataEntries + a.bUpdatesPluginInfo.FirstBlockDone() + a.bUpdatesPluginInfo.WriteBUpdates(bUpdatesInfo) + return nil + } + + a.bUpdatesPluginInfo.WriteBUpdates(bUpdatesInfo) + return nil +} + +func BuildBlockUpdatesInfoFromSnapshot(blockInfo *proto.BlockInfo, blockHeader *proto.BlockHeader, + blockSnapshot proto.BlockSnapshot, l2ContractAddress proto.WavesAddress) proto.BUpdatesInfo { blockID := blockHeader.BlockID() - bUpdatesInfo := blockchaininfo.BUpdatesInfo{ - BlockUpdatesInfo: blockchaininfo.BlockUpdatesInfo{ + bUpdatesInfo := proto.BUpdatesInfo{ + BlockUpdatesInfo: proto.BlockUpdatesInfo{ Height: blockInfo.Height, VRF: blockInfo.VRF, BlockID: blockID, BlockHeader: *blockHeader, }, - ContractUpdatesInfo: blockchaininfo.L2ContractDataEntries{ + ContractUpdatesInfo: proto.L2ContractDataEntries{ AllDataEntries: nil, Height: blockInfo.Height, }, } - if a.bUpdatesExtension.IsFirstRequestedBlock() { - dataEntries, err := a.ia.state.RetrieveEntries(proto.NewRecipientFromAddress(a.bUpdatesExtension.L2ContractAddress())) - if err != nil && !errors.Is(err, proto.ErrNotFound) { - return err - } - bUpdatesInfo.ContractUpdatesInfo.AllDataEntries = dataEntries - a.bUpdatesExtension.FirstBlockDone() - a.bUpdatesExtension.WriteBUpdates(bUpdatesInfo) - return nil - } // Write the L2 contract updates into the structure. for _, txSnapshots := range blockSnapshot.TxSnapshots { for _, snapshot := range txSnapshots { if dataEntriesSnapshot, ok := snapshot.(*proto.DataEntriesSnapshot); ok { - if dataEntriesSnapshot.Address == a.bUpdatesExtension.L2ContractAddress() { + if dataEntriesSnapshot.Address == l2ContractAddress { bUpdatesInfo.ContractUpdatesInfo.AllDataEntries = append(bUpdatesInfo.ContractUpdatesInfo.AllDataEntries, dataEntriesSnapshot.DataEntries...) } } } } - - a.bUpdatesExtension.WriteBUpdates(bUpdatesInfo) - return nil + return bUpdatesInfo } func (a *txAppender) createCheckerInfo(params *appendBlockParams) (*checkerInfo, error) { diff --git a/pkg/state/database.go b/pkg/state/database.go index c602746a3..dd7460200 100644 --- a/pkg/state/database.go +++ b/pkg/state/database.go @@ -34,12 +34,14 @@ func (inf *stateInfo) unmarshalBinary(data []byte) error { return cbor.Unmarshal(data, inf) } +const forceUpdateStateVersion = false + func saveStateInfo(db keyvalue.KeyValue, params StateParams) error { has, err := db.Has(stateInfoKeyBytes) if err != nil { return err } - if has { + if has && !forceUpdateStateVersion { return nil } info := &stateInfo{ @@ -47,6 +49,17 @@ func saveStateInfo(db keyvalue.KeyValue, params StateParams) error { HasExtendedApiData: params.StoreExtendedApiData, HasStateHashes: params.BuildStateHashes, } + if has && forceUpdateStateVersion { + data, dbErr := db.Get(stateInfoKeyBytes) + if dbErr != nil { + return dbErr + } + if ubErr := info.unmarshalBinary(data); ubErr != nil { + return ubErr + } + info.Version = StateVersion + } + return putStateInfoToDB(db, info) } diff --git a/pkg/state/history_storage.go b/pkg/state/history_storage.go index 9c3a25acb..7978322a8 100644 --- a/pkg/state/history_storage.go +++ b/pkg/state/history_storage.go @@ -593,7 +593,6 @@ func (hs *historyStorage) getHistory(key []byte, update bool) (*historyRecord, e // So we do both read and write under same lock. hs.writeLock.Lock() defer hs.writeLock.Unlock() - historyBytes, err := hs.db.Get(key) if err != nil { return nil, err // `keyvalue.ErrNotFound` is possible here along with other unwrapped DB errors diff --git a/pkg/state/state.go b/pkg/state/state.go index 314cfef37..7f1047bd7 100644 --- a/pkg/state/state.go +++ b/pkg/state/state.go @@ -16,7 +16,6 @@ import ( "go.uber.org/atomic" "go.uber.org/zap" - "github.com/wavesplatform/gowaves/pkg/blockchaininfo" "github.com/wavesplatform/gowaves/pkg/consensus" "github.com/wavesplatform/gowaves/pkg/crypto" "github.com/wavesplatform/gowaves/pkg/errs" @@ -520,7 +519,7 @@ func newStateManager( params StateParams, settings *settings.BlockchainSettings, enableLightNode bool, - bUpdatesExtension *blockchaininfo.BlockchainUpdatesExtension, + bUpdatesPluginInfo *proto.BlockchainUpdatesPluginInfo, ) (_ *stateManager, retErr error) { if err := validateSettings(settings); err != nil { return nil, err @@ -605,7 +604,7 @@ func newStateManager( // Set fields which depend on state. // Consensus validator is needed to check block headers. snapshotApplier := newBlockSnapshotsApplier(nil, newSnapshotApplierStorages(stor, rw)) - appender, err := newTxAppender(state, rw, stor, settings, sdb, atx, &snapshotApplier, bUpdatesExtension) + appender, err := newTxAppender(state, rw, stor, settings, sdb, atx, &snapshotApplier, bUpdatesPluginInfo) if err != nil { return nil, wrapErr(Other, err) } diff --git a/pkg/state/threadsafe_wrapper.go b/pkg/state/threadsafe_wrapper.go index 461f8f457..6ffe84b06 100644 --- a/pkg/state/threadsafe_wrapper.go +++ b/pkg/state/threadsafe_wrapper.go @@ -28,6 +28,12 @@ func (a *ThreadSafeReadWrapper) BlockVRF(blockHeader *proto.BlockHeader, blockHe return a.s.BlockVRF(blockHeader, blockHeight) } +func (a *ThreadSafeReadWrapper) NewestBlockInfoByHeight(height proto.Height) (*proto.BlockInfo, error) { + a.mu.RLock() + defer a.mu.RUnlock() + return a.s.NewestBlockInfoByHeight(height) +} + func (a *ThreadSafeReadWrapper) MapR(f func(StateInfo) (interface{}, error)) (interface{}, error) { a.mu.RLock() defer a.mu.RUnlock() @@ -62,6 +68,12 @@ func (a *ThreadSafeReadWrapper) HeaderByHeight(height proto.Height) (*proto.Bloc return a.s.HeaderByHeight(height) } +func (a *ThreadSafeReadWrapper) NewestHeaderByHeight(height uint64) (*proto.BlockHeader, error) { + a.mu.RLock() + defer a.mu.RUnlock() + return a.s.NewestHeaderByHeight(height) +} + func (a *ThreadSafeReadWrapper) Height() (proto.Height, error) { a.mu.RLock() defer a.mu.RUnlock()