Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions auth/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,19 @@ func (suite *QuerierTestSuite) TestQueryParams() {
require.Equal(t, uint64(8), params.TxSizeCostPerByte)

{
// When params are not set, querier should return zero values instead of panicking
happ := app.Setup(true)
ctx := happ.BaseApp.NewContext(true, abci.Header{})
querier := auth.NewQuerier(happ.AccountKeeper)
require.Panics(t, func() {
querier(ctx, path, req)
})
res, err = querier(ctx, path, req)
require.NoError(t, err)
require.NotNil(t, res)

var zeroParams types.Params
err4 := json.Unmarshal(res, &zeroParams)
require.NoError(t, err4)
// Params should be zero values when not initialized
require.Equal(t, uint64(0), zeroParams.MaxMemoCharacters)
require.Equal(t, uint64(0), zeroParams.TxSigLimit)
}
}
26 changes: 17 additions & 9 deletions bridge/setu/listener/heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
checkpointTypes "github.com/maticnetwork/heimdall/checkpoint/types"
clerkTypes "github.com/maticnetwork/heimdall/clerk/types"
featureManagerTypes "github.com/maticnetwork/heimdall/featuremanager/types"
slashingTypes "github.com/maticnetwork/heimdall/slashing/types"
stakingTypes "github.com/maticnetwork/heimdall/staking/types"
htype "github.com/maticnetwork/heimdall/types"
Expand Down Expand Up @@ -48,8 +47,17 @@ func (hl *HeimdallListener) Start() error {

// Heimdall pollIntervall = (minimal pollInterval of rootchain and matichain)
pollInterval := helper.GetConfig().EthSyncerPollInterval
if helper.GetConfig().CheckpointerPollInterval < helper.GetConfig().EthSyncerPollInterval {
pollInterval = helper.GetConfig().CheckpointerPollInterval

checkpointPollInterval := helper.GetConfig().CheckpointerPollInterval

// fetch initial checkpoint params (will retry up to 10 times or exit service)
checkpointParams := util.GetCheckpointParamsWithRetry(hl.cliCtx)
if checkpointParams.CheckpointPollInterval > 0 {
checkpointPollInterval = checkpointParams.CheckpointPollInterval
}

if checkpointPollInterval < helper.GetConfig().EthSyncerPollInterval {
pollInterval = checkpointPollInterval
}

hl.Logger.Info("Start polling for events", "pollInterval", pollInterval)
Expand Down Expand Up @@ -293,12 +301,12 @@ func (hl *HeimdallListener) StartPollingEventRecord(ctx context.Context, pollInt
}

func (hl *HeimdallListener) loadEventRecords(ctx context.Context, pollInterval time.Duration) {
targetFeature, err := util.GetTargetFeatureConfig(hl.cliCtx, featureManagerTypes.DynamicCheckpoint)
if err != nil || !targetFeature.IsOpen {
hl.Logger.Info("Feature not supported... goroutine exists")

return
}
//targetFeature, err := util.GetTargetFeatureConfig(hl.cliCtx, featureManagerTypes.DynamicCheckpoint)
//if err != nil || !targetFeature.IsOpen {
// hl.Logger.Info("Feature not supported... goroutine exists")
//
// return
//}

if atomic.LoadUint32(&hl.stateSyncedInitializationRun) == 1 {
hl.Logger.Info("Last ProcessEventRecords not finished... goroutine exists")
Expand Down
11 changes: 9 additions & 2 deletions bridge/setu/listener/maticchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/RichardKnop/machinery/v1/tasks"
"github.com/maticnetwork/heimdall/bridge/setu/util"
"github.com/maticnetwork/heimdall/helper"
)

Expand Down Expand Up @@ -33,9 +34,15 @@ func (ml *MaticChainListener) Start() error {
// start header process
go ml.StartHeaderProcess(headerCtx)

ml.Logger.Info("Start polling for header blocks", "pollInterval", helper.GetConfig().CheckpointerPollInterval)
pollInterval := helper.GetConfig().CheckpointerPollInterval
params := util.GetCheckpointParamsWithRetry(ml.cliCtx)
if params.CheckpointPollInterval > 0 {
pollInterval = params.CheckpointPollInterval
}

ml.Logger.Info("Start polling for header blocks", "pollInterval", pollInterval)

go ml.StartPolling(ctx, helper.GetConfig().CheckpointerPollInterval, true, nil)
go ml.StartPolling(ctx, pollInterval, true, nil)

// subscribed to new head
ml.Logger.Info("Subscribed to new head")
Expand Down
62 changes: 49 additions & 13 deletions bridge/setu/processor/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,23 @@ func (cp *CheckpointProcessor) startPolling(ctx context.Context) {
now := time.Now()
baseTime := time.Unix(0, 0)
// no-ack ticker interval keep same with checkpoint interval
noAckInterval := helper.GetConfig().CheckpointerPollInterval
checkpointPollInterval := helper.GetConfig().CheckpointerPollInterval

// fetch initial checkpoint params (will retry up to 10 times or exit service)
checkpointParams := util.GetCheckpointParamsWithRetry(cp.cliCtx)
if checkpointParams.CheckpointPollInterval > 0 {
checkpointPollInterval = checkpointParams.CheckpointPollInterval
}

// adjust no-ack ticker to tick at the middle of checkpoint interval
firstIntervalForNoAck := noAckInterval - (now.UTC().Sub(baseTime) % noAckInterval) - noAckInterval/2 // nolint: gomnd
firstIntervalForNoAck := checkpointPollInterval - (now.UTC().Sub(baseTime) % checkpointPollInterval) - checkpointPollInterval/2 // nolint: gomnd
if firstIntervalForNoAck <= 0 {
firstIntervalForNoAck += noAckInterval
firstIntervalForNoAck += checkpointPollInterval
}

tickerForNoAck := time.NewTicker(firstIntervalForNoAck)
syncInterval := helper.GetConfig().CheckpointerPollInterval / 2
syncInterval := checkpointPollInterval / 2
noAckInterval := checkpointPollInterval
tickerForSync := time.NewTicker(syncInterval)
// stop ticker when everything done
defer tickerForNoAck.Stop()
Expand Down Expand Up @@ -568,10 +576,10 @@ func (cp *CheckpointProcessor) createAndSendCheckpointToHeimdall(checkpointConte
latestCheckpoint, err := util.GetlastestCheckpoint(cp.cliCtx, rootChain)
// event checkpoint is older than or equal to latest checkpoint
if err == nil && latestCheckpoint != nil && latestCheckpoint.EndBlock+1 < start {
cp.Logger.Debug("Need to resubmit Checkpoint ack first", "start", start, "last_end", latestCheckpoint.EndBlock)
cp.Logger.Info("Need to resubmit Checkpoint ack first", "start", start, "last_end", latestCheckpoint.EndBlock)
err := cp.resubmitCheckpointAck(checkpointContext, rootChain)
if err != nil {
cp.Logger.Info("Error while resubmit checkpoint ack", "root", rootChain, "err", err)
cp.Logger.Error("Error while resubmit checkpoint ack", "root", rootChain, "err", err)
return err
}
return nil
Expand Down Expand Up @@ -840,16 +848,36 @@ func (cp *CheckpointProcessor) checkIfNoAckIsRequired(checkpointContext *Checkpo
currentTime := time.Now().UTC()

timeDiff := currentTime.Sub(checkpointCreationTime)
if timeDiff.Seconds() >= helper.GetConfig().CheckpointerPollInterval.Seconds() && index == 0 {
index = math.Floor(timeDiff.Seconds() / helper.GetConfig().CheckpointerPollInterval.Seconds())

// checkpoint params
checkpointParams := checkpointContext.CheckpointParams

var checkpointPollInterval time.Duration
if checkpointParams.CheckpointPollInterval > 0 {
checkpointPollInterval = checkpointParams.CheckpointPollInterval
} else {
checkpointPollInterval = helper.GetConfig().CheckpointerPollInterval
}

if index == 0 {
var checkpointTimeout time.Duration
isOpen, tronMaxLength, err := cp.getTronDynamicCheckpointProposalWithErr()
if err != nil {
cp.Logger.Error("failed to check if no ack is required. Error while fetching dynamic checkpoint feature", "error", err)
return false, uint64(index)
}
if isOpen {
checkpointTimeout, _ = helper.CalcCheckpointTimeout(tronMaxLength, checkpointPollInterval)
} else {
checkpointTimeout = checkpointPollInterval
}

// checkpoint params
checkpointParams := checkpointContext.CheckpointParams
if timeDiff.Seconds() >= checkpointTimeout.Seconds() && index == 0 {
index = math.Floor(timeDiff.Seconds() / checkpointTimeout.Seconds())
}

if index == 0 {
return false, uint64(index)
}

// check if difference between no-ack time and current time
lastNoAck := cp.getLastNoAckTime()
Expand Down Expand Up @@ -991,9 +1019,7 @@ func (cp *CheckpointProcessor) Stop() {
cp.cancelNoACKPolling()
}

//
// utils
//
func (cp *CheckpointProcessor) getCheckpointContext(rootChain string) (*CheckpointContext, error) {
// fetch chain params for different root chains
chainmanagerParams, err := util.GetNewChainParams(cp.cliCtx, rootChain)
Expand Down Expand Up @@ -1090,3 +1116,13 @@ func (cp *CheckpointProcessor) getDynamicCheckpointProposal(rootType string) (bo

return fea.IsOpen, fea.IntConf[strings.ToLower(rootType)] != 0, fea.IntConf["maxLength"]
}

func (cp *CheckpointProcessor) getTronDynamicCheckpointProposalWithErr() (bool, int, error) {
fea, err := util.GetTronDynamicCheckpointFeature(cp.cliCtx)
if err != nil {
cp.Logger.Error("Error while fetching dynamic checkpoint feature", "error", err)

return false, 0, err
}
return fea.IsOpen, fea.IntConf["maxLength"], err
}
2 changes: 1 addition & 1 deletion bridge/setu/processor/checkpointsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (cp *CheckpointProcessor) sendCheckpointSyncToStakeChain(eventBytes string,
} else {
txHash := common.FromHex(txHash)
if err := cp.createAndSendCheckpointSyncToTron(checkpointContext, number, startBlock, endBlock, rootChain, blockHeight, txHash); err != nil {
cp.Logger.Error("Error sending checkpoint to rootchain", "error", err)
cp.Logger.Error("Error sending checkpoint sync to rootchain", "error", err)
return err
}
}
Expand Down
36 changes: 34 additions & 2 deletions bridge/setu/processor/troncheckpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ func (cp *CheckpointProcessor) nextExpectedTronCheckpoint(checkpointContext *Che
"end", end,
)
}

// Check cross-chain transactions for Tron dynamic checkpoint feature
if !cp.checkCrossChainForTron(start, end, checkpointParams.MaxCheckpointLength) {
end = start
}

// Handle when block producers go down
if end == 0 || end == start || (0 < diff && diff < checkpointParams.AvgCheckpointLength) {
cp.Logger.Debug("Fetching last header block to calculate time")
Expand Down Expand Up @@ -137,10 +143,10 @@ func (cp *CheckpointProcessor) createAndSendTronCheckpointToHeimdall(checkpointC
latestCheckpoint, err := util.GetlastestCheckpoint(cp.cliCtx, hmTypes.RootChainTypeTron)
// event checkpoint is older than or equal to latest checkpoint
if err == nil && latestCheckpoint != nil && latestCheckpoint.EndBlock+1 < start {
cp.Logger.Debug("Need to resubmit Checkpoint ack first", "start", start, "last_end", latestCheckpoint.EndBlock)
cp.Logger.Info("Need to resubmit Checkpoint ack first", "start", start, "last_end", latestCheckpoint.EndBlock)
err := cp.resubmitTronCheckpointAck(checkpointContext)
if err != nil {
cp.Logger.Info("Error while resubmit checkpoint ack", "root", hmTypes.RootChainTypeTron, "err", err)
cp.Logger.Error("Error while resubmit checkpoint ack", "root", hmTypes.RootChainTypeTron, "err", err)
return err
}
return nil
Expand Down Expand Up @@ -343,3 +349,29 @@ func (cp *CheckpointProcessor) resubmitTronCheckpointAck(checkpointContext *Chec

return nil
}

// checkCrossChainForTron checks if checkpoint should be submitted based on Tron dynamic checkpoint feature
// This method implements the same logic as checkCrossChain but specifically for Tron chain
func (cp *CheckpointProcessor) checkCrossChainForTron(start uint64, end uint64, maxCheckpointLengthParam uint64) bool {
// Get Tron dynamic checkpoint feature configuration
isOpen, maxLength, err := cp.getTronDynamicCheckpointProposalWithErr()
if err != nil || !isOpen {
// If feature is not enabled or error occurred, allow checkpoint submission
return true
}

// Validate maxLength parameter
if maxCheckpointLengthParam < uint64(maxLength) {
cp.Logger.Error("proposal feature-tron-dynamic-checkpoint maxlength is too long",
"maxLength", maxLength, "MaxCheckpointLength", maxCheckpointLengthParam)
return true
}

// If checkpoint length already reached maxLength, allow submission
if end-start+1 >= uint64(maxLength) {
return true
}

// Check if there are cross-chain transactions for Tron
return cp.hasCrossChainTx(start, end, hmTypes.RootChainTypeTron)
}
38 changes: 38 additions & 0 deletions bridge/setu/util/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,40 @@ func GetCheckpointParams(cliCtx cliContext.CLIContext) (*checkpointTypes.Params,
return &params, nil
}

// GetCheckpointParamsWithRetry guarantees successful retrieval of checkpoint parameters
// by retrying up to 10 times. If it fails after 10 attempts, the service will exit.
func GetCheckpointParamsWithRetry(cliCtx cliContext.CLIContext) *checkpointTypes.Params {
const maxRetries = 10
retryDelay := 1 * time.Second
maxRetryDelay := 30 * time.Second

for attempt := 1; attempt <= maxRetries; attempt++ {
params, err := GetCheckpointParams(cliCtx)
if err == nil {
logger.Info("Successfully fetched checkpoint params", "attempt", attempt)
return params
}

logger.Error("Failed to fetch checkpoint params, retrying...",
"err", err, "attempt", attempt, "maxRetries", maxRetries, "retryAfter", retryDelay)

if attempt < maxRetries {
time.Sleep(retryDelay)

// Exponential backoff with cap
retryDelay *= 2
if retryDelay > maxRetryDelay {
retryDelay = maxRetryDelay
}
}
}

logger.Error("Unexpected: exceeded retry loop without returning or exiting")

panic(errors.New("Failed to fetch checkpoint params"))
return nil
}

// GetBufferedCheckpoint return checkpoint from bueffer
func GetBufferedCheckpoint(cliCtx cliContext.CLIContext, rootChain string) (*hmtypes.Checkpoint, error) {
response, err := helper.FetchFromAPI(
Expand Down Expand Up @@ -570,6 +604,10 @@ func GetDynamicCheckpointFeature(cliCtx cliContext.CLIContext) (*featureManagerT
return GetTargetFeatureConfig(cliCtx, featureManagerTypes.DynamicCheckpoint)
}

func GetTronDynamicCheckpointFeature(cliCtx cliContext.CLIContext) (*featureManagerTypes.PlainFeatureData, error) {
return GetTargetFeatureConfig(cliCtx, featureManagerTypes.TronDynamicCheckpoint)
}

func GetFinalizedEthOpen(cliCtx cliContext.CLIContext) bool {
feature, err := GetTargetFeatureConfig(cliCtx, featureManagerTypes.FinalizedEth)
if err != nil {
Expand Down
13 changes: 10 additions & 3 deletions chainmanager/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,18 @@ func (suite *QuerierTestSuite) TestQueryParams() {
require.Equal(t, defaultParams.ChainParams, params.ChainParams)

{
// When params are not set, querier should return zero values instead of panicking
rapp := app.Setup(true)
ctx := rapp.BaseApp.NewContext(true, abci.Header{})
querier := chainmanager.NewQuerier(rapp.ChainKeeper)
require.Panics(t, func() {
querier(ctx, path, req)
})
res, err = querier(ctx, path, req)
require.NoError(t, err)
require.NotNil(t, res)

var zeroParams types.Params
json.Unmarshal(res, &zeroParams)
// Params should be zero values when not initialized
require.Equal(t, uint64(0), zeroParams.MainchainTxConfirmations)
require.Equal(t, uint64(0), zeroParams.MaticchainTxConfirmations)
}
}
22 changes: 20 additions & 2 deletions checkpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,14 +225,32 @@ func handleMsgCheckpointNoAck(ctx sdk.Context, msg types.MsgCheckpointNoAck, k K
// Get buffer time from params
bufferTime := k.GetParams(ctx).CheckpointBufferTime

var checkpointPollInterval time.Duration
if k.GetParams(ctx).CheckpointPollInterval > 0 {
checkpointPollInterval = k.GetParams(ctx).CheckpointPollInterval
} else {
checkpointPollInterval = helper.GetConfig().CheckpointerPollInterval
}

var checkpointTimeout time.Duration
tronDynamicFeature := util.GetFeatureConfig().GetFeature(ctx, featuremanagerTypes.TronDynamicCheckpoint)
if tronDynamicFeature.IsOpen {
tronMaxLength := tronDynamicFeature.IntConf["maxLength"]
checkpointTimeout, _ = helper.CalcCheckpointTimeout(tronMaxLength, checkpointPollInterval)
} else {
checkpointTimeout = bufferTime
}

// Fetch last checkpoint from store
// TODO figure out how to handle this error
lastCheckpoint, _ := k.GetLastCheckpoint(ctx, hmTypes.RootChainTypeStake)
lastCheckpointTime := time.Unix(int64(lastCheckpoint.TimeStamp), 0)

// If last checkpoint is not present or last checkpoint happens before checkpoint buffer time -- thrown an error
if lastCheckpointTime.After(currentTime) || (currentTime.Sub(lastCheckpointTime) < bufferTime) {
logger.Debug("Invalid No ACK -- Waiting for last checkpoint ACK")
if lastCheckpointTime.After(currentTime) || (currentTime.Sub(lastCheckpointTime) < checkpointTimeout) {
logger.Debug("Invalid No ACK -- Waiting for last checkpoint ACK", "lastCheckpoint", lastCheckpoint,
"lastCheckpointTime", lastCheckpointTime, "currentTime", currentTime,
"checkpointTimeout", checkpointTimeout)
return common.ErrInvalidNoACK(k.Codespace()).Result()
}

Expand Down
Loading
Loading