Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support rollback #1610

Merged
merged 15 commits into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .mockery.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
quiet: False
dir: "{{.InterfaceDir}}/mocks"
mockname: "Mock{{.InterfaceName}}"
with-expecter: true
issue-845-fix: True
filename: "{{.InterfaceName | snakecase}}.go"

packages:
Expand All @@ -9,3 +10,6 @@ packages:
Header:
Protocol:
Handler:
github.com/wavesplatform/gowaves/pkg/blockchaininfo:
interfaces:
UpdatesPublisherInterface:
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ mock:
mockgen -source pkg/p2p/peer/peer.go -destination pkg/mock/peer.go -package mock Peer
mockgen -source pkg/state/api.go -destination pkg/mock/state.go -package mock State
mockgen -source pkg/grpc/server/api.go -destination pkg/mock/grpc.go -package mock GrpcHandlers
mockery --dir=pkg/mock --filename=blockchaininfo_types.go --outpkg=mock # The interface name must be specified in .mockery.yaml, see examples there.


proto:
@protoc --proto_path=pkg/grpc/protobuf-schemas/proto/ --go_out=./ --go_opt=module=$(MODULE) --go-vtproto_out=./ --go-vtproto_opt=features=marshal_strict+unmarshal+size --go-vtproto_opt=module=$(MODULE) pkg/grpc/protobuf-schemas/proto/waves/*.proto
Expand Down
6 changes: 6 additions & 0 deletions cmd/blockchaininfo/nats_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ func main() {
}
defer nc.Close()

keysErr := blockchaininfo.SendConstantKeys(nc)
if keysErr != nil {
zap.S().Fatalf("Failed to send constant keys: %v", keysErr)
return
}

_, err = nc.Subscribe(blockchaininfo.BlockUpdates, func(msg *nats.Msg) {
receiveBlockUpdates(msg)
})
Expand Down
85 changes: 57 additions & 28 deletions cmd/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,20 @@ func run(nc *config) (retErr error) {
return nil
}

func initBlockchainUpdatesPlugin(ctx context.Context,
l2addressContract string,
enableBlockchainUpdatesPlugin bool,
updatesChannel chan<- proto.BUpdatesInfo, firstBlock *bool,
) (*proto.BlockchainUpdatesPluginInfo, error) {
l2address, cnvrtErr := proto.NewAddressFromString(l2addressContract)
if cnvrtErr != nil {
return nil, errors.Wrapf(cnvrtErr, "failed to convert L2 contract address %q", l2addressContract)
}
bUpdatesPluginInfo := proto.NewBlockchainUpdatesPluginInfo(ctx, l2address, updatesChannel,
firstBlock, enableBlockchainUpdatesPlugin)
return bUpdatesPluginInfo, nil
}

func runNode(ctx context.Context, nc *config) (_ io.Closer, retErr error) {
cfg, err := blockchainSettings(nc)
if err != nil {
Expand Down Expand Up @@ -402,25 +416,44 @@ func runNode(ctx context.Context, nc *config) (_ io.Closer, retErr error) {
return nil, errors.Wrap(err, "failed to create state parameters")
}

updatesChannel := make(chan proto.BUpdatesInfo)
extensionReady := make(chan struct{})
firstBlock := false
bUpdatesPluginInfo, initErr := initBlockchainUpdatesPlugin(ctx, nc.BlockchainUpdatesL2Address,
nc.enableBlockchainUpdatesPlugin, updatesChannel, &firstBlock)
if initErr != nil {
return nil, errors.Wrap(err, "failed to initialize blockchain updates plugin")
}
st, err := state.NewState(path, true, params, cfg, nc.enableLightMode, bUpdatesPluginInfo)
if err != nil {
return nil, errors.Wrap(err, "failed to initialize node's state")
}
defer func() { retErr = closeIfErrorf(st, retErr, "failed to close state") }()
go func() {
<-extensionReady
bUpdatesPluginInfo.MakeExtensionReady()
close(extensionReady)
}()

var bUpdatesExtension *blockchaininfo.BlockchainUpdatesExtension
if nc.enableBlockchainUpdatesPlugin {
var bUErr error
bUpdatesExtension, bUErr = runBlockchainUpdatesPlugin(ctx, cfg, nc.BlockchainUpdatesL2Address)
bUpdatesExtension, bUErr = initializeBlockchainUpdatesExtension(ctx, cfg, nc.BlockchainUpdatesL2Address,
updatesChannel, &firstBlock, st, extensionReady)
if bUErr != nil {
return nil, errors.Wrap(bUErr, "failed to run blockchain updates plugin")
}
go bUpdatesExtension.ReceiveSignals()
go func() {
publshrErr := bUpdatesExtension.RunBlockchainUpdatesPublisher(ctx,
cfg.AddressSchemeCharacter)
if publshrErr != nil {
zap.S().Fatalf("Failed to run blockchain updates publisher: %v", publshrErr)
}
}()
zap.S().Info("The blockchain info extension started pulling info from smart contract address",
nc.BlockchainUpdatesL2Address)
}

// Send updatesChannel into BlockchainSettings. Write updates into this channel
st, err := state.NewState(path, true, params, cfg, nc.enableLightMode, bUpdatesExtension)
if err != nil {
return nil, errors.Wrap(err, "failed to initialize node's state")
}
defer func() { retErr = closeIfErrorf(st, retErr, "failed to close state") }()

features, err := minerFeatures(st, nc.minerVoteFeatures)
if err != nil {
return nil, errors.Wrap(err, "failed to parse and validate miner features")
Expand Down Expand Up @@ -817,34 +850,30 @@ func runAPIs(
return nil
}

func runBlockchainUpdatesPlugin(
func initializeBlockchainUpdatesExtension(
ctx context.Context,
cfg *settings.BlockchainSettings,
l2ContractAddress string,
updatesChannel chan proto.BUpdatesInfo,
firstBlock *bool,
state state.State,
extensionReady chan<- struct{},
) (*blockchaininfo.BlockchainUpdatesExtension, error) {
l2address, cnvrtErr := proto.NewAddressFromString(l2ContractAddress)
if cnvrtErr != nil {
return nil, errors.Wrapf(cnvrtErr, "failed to convert L2 contract address %q", l2ContractAddress)
}

bUpdatesExtensionState := blockchaininfo.NewBUpdatesExtensionState(
bUpdatesExtensionState, err := blockchaininfo.NewBUpdatesExtensionState(
blockchaininfo.StoreBlocksLimit,
cfg.AddressSchemeCharacter,
l2ContractAddress,
state,
)

updatesChannel := make(chan blockchaininfo.BUpdatesInfo)
requestsChannel := make(chan blockchaininfo.L2Requests)
go func() {
err := bUpdatesExtensionState.RunBlockchainUpdatesPublisher(ctx, updatesChannel,
cfg.AddressSchemeCharacter, requestsChannel)
if err != nil {
zap.S().Fatalf("Failed to run blockchain updates publisher: %v", err)
}
}()

if err != nil {
return nil, errors.Wrap(err, "failed to initialize blockchain updates extension state")
}
l2address, cnvrtErr := proto.NewAddressFromString(l2ContractAddress)
if cnvrtErr != nil {
return nil, errors.Wrapf(cnvrtErr, "failed to convert L2 contract address %q", l2ContractAddress)
}
return blockchaininfo.NewBlockchainUpdatesExtension(ctx, l2address, updatesChannel,
requestsChannel, bUpdatesExtensionState), nil
bUpdatesExtensionState, firstBlock, extensionReady), nil
}

func FromArgs(scheme proto.Scheme, c *config) func(s *settings.NodeSettings) error {
Expand Down
Loading
Loading