Skip to content

Commit

Permalink
chore: refactor tally querying and types
Browse files Browse the repository at this point in the history
  • Loading branch information
freak12techno committed Dec 20, 2023
1 parent ab5a1e6 commit 776a5cc
Show file tree
Hide file tree
Showing 11 changed files with 371 additions and 312 deletions.
108 changes: 4 additions & 104 deletions pkg/data/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ func (m *Manager) GetTallies() (map[string]types.ChainTallyInfos, error) {
var mutex sync.Mutex

errors := make([]error, 0)

pools := make(map[string]types.Pool, 0)
proposals := make(map[string][]types.Proposal, 0)
tallies := make(map[string]map[string]types.Tally, 0)
tallies := make(map[string]types.ChainTallyInfos, 0)

for _, chain := range m.Chains {
rpc := cosmos.NewRPC(chain, m.Logger)
Expand All @@ -40,81 +37,18 @@ func (m *Manager) GetTallies() (map[string]types.ChainTallyInfos, error) {
go func(c *types.Chain, rpc *cosmos.RPC) {
defer wg.Done()

pool, err := rpc.GetStakingPool()
talliesForChain, err := rpc.GetTallies()

mutex.Lock()

if err != nil {
m.Logger.Error().Err(err).Str("chain", c.Name).Msg("Error fetching staking pool")
errors = append(errors, err)
} else if pool.Pool == nil {
m.Logger.Error().Err(err).Str("chain", c.Name).Msg("Staking pool is empty!")
errors = append(errors, fmt.Errorf("staking pool is empty"))
} else {
pools[c.Name] = *pool.Pool
tallies[c.Name] = talliesForChain
}
mutex.Unlock()
}(chain, rpc)

wg.Add(1)
go func(c *types.Chain, rpc *cosmos.RPC) {
defer wg.Done()

chainProposals, err := rpc.GetAllProposals()

mutex.Lock()

if err != nil {
m.Logger.Error().Err(err).Str("chain", c.Name).Msg("Error fetching chain proposals")
errors = append(errors, err)

mutex.Unlock()
return
} else {
proposals[c.Name] = chainProposals
}

mutex.Unlock()

var internalWg sync.WaitGroup

for _, proposal := range chainProposals {
internalWg.Add(1)

go func(c *types.Chain, p types.Proposal) {
defer internalWg.Done()

tally, err := rpc.GetTally(p.ID)

mutex.Lock()
defer mutex.Unlock()

if err != nil {
m.Logger.Error().
Err(err).
Str("chain", c.Name).
Str("proposal_id", p.ID).
Msg("Error fetching tally for proposal")
errors = append(errors, err)
} else if tally.Tally == nil {
m.Logger.Error().
Err(err).
Str("chain", c.Name).
Str("proposal_id", p.ID).
Msg("Tally is empty")
errors = append(errors, fmt.Errorf("tally is empty"))
} else {
if _, ok := tallies[c.Name]; !ok {
tallies[c.Name] = make(map[string]types.Tally, 0)
}

tallies[c.Name][p.ID] = *tally.Tally
}
}(c, proposal)
}

internalWg.Wait()
}(chain, rpc)
}

wg.Wait()
Expand All @@ -124,41 +58,7 @@ func (m *Manager) GetTallies() (map[string]types.ChainTallyInfos, error) {
return map[string]types.ChainTallyInfos{}, fmt.Errorf("could not get tallies info: got %d errors", len(errors))
}

tallyInfos := make(map[string]types.ChainTallyInfos, 0)

for chainName, chainProposals := range proposals {
chain := m.Chains.FindByName(chainName)
if chain == nil {
return map[string]types.ChainTallyInfos{}, fmt.Errorf("could not chain with name %s", chainName)
}

if _, ok := tallyInfos[chainName]; !ok {
tallyInfos[chainName] = types.ChainTallyInfos{
Chain: chain,
TallyInfos: make([]types.TallyInfo, len(chainProposals)),
}
}

for index, proposal := range chainProposals {
tally, ok := tallies[chainName][proposal.ID]
if !ok {
return map[string]types.ChainTallyInfos{}, fmt.Errorf("could not get tallies info")
}

pool, ok := pools[chainName]
if !ok {
return map[string]types.ChainTallyInfos{}, fmt.Errorf("could not get tallies info")
}

tallyInfos[chainName].TallyInfos[index] = types.TallyInfo{
Proposal: proposal,
Tally: tally,
Pool: pool,
}
}
}

return tallyInfos, nil
return tallies, nil
}

func (m *Manager) GetChainParams(chain *types.Chain) (*types.ChainWithVotingParams, []error) {
Expand Down
129 changes: 5 additions & 124 deletions pkg/fetchers/cosmos/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package cosmos

import (
"encoding/json"
"errors"
"fmt"
"main/pkg/fetchers/cosmos/responses"
"main/pkg/utils"
"net/http"
"strings"
"time"

"main/pkg/types"
Expand All @@ -18,13 +15,15 @@ import (
const PaginationLimit = 1000

type RPC struct {
ChainConfig *types.Chain
URLs []string
ProposalsType string
Logger zerolog.Logger
}

func NewRPC(chainConfig *types.Chain, logger zerolog.Logger) *RPC {
return &RPC{
ChainConfig: chainConfig,
URLs: chainConfig.LCDEndpoints,
ProposalsType: chainConfig.ProposalsType,
Logger: logger.With().Str("component", "rpc").Logger(),
Expand All @@ -39,139 +38,21 @@ func (rpc *RPC) GetAllProposals() ([]types.Proposal, *types.QueryError) {
return rpc.GetAllV1beta1Proposals()
}

func (rpc *RPC) GetAllV1beta1Proposals() ([]types.Proposal, *types.QueryError) {
proposals := []types.Proposal{}
offset := 0

for {
url := fmt.Sprintf(
// 2 is for PROPOSAL_STATUS_VOTING_PERIOD
"/cosmos/gov/v1beta1/proposals?pagination.limit=%d&pagination.offset=%d&proposal_status=2",
PaginationLimit,
offset,
)

var batchProposals responses.V1Beta1ProposalsRPCResponse
if errs := rpc.Get(url, &batchProposals); len(errs) > 0 {
return nil, &types.QueryError{
QueryError: nil,
NodeErrors: errs,
}
}

if batchProposals.Message != "" {
return nil, &types.QueryError{
QueryError: errors.New(batchProposals.Message),
}
}

parsedProposals := utils.Map(batchProposals.Proposals, func(p responses.V1beta1Proposal) types.Proposal {
return p.ToProposal()
})
proposals = append(proposals, parsedProposals...)
if len(batchProposals.Proposals) < PaginationLimit {
break
}

offset += PaginationLimit
}

return proposals, nil
}

func (rpc *RPC) GetAllV1Proposals() ([]types.Proposal, *types.QueryError) {
proposals := []types.Proposal{}
offset := 0

for {
url := fmt.Sprintf(
// 2 is for PROPOSAL_STATUS_VOTING_PERIOD
"/cosmos/gov/v1/proposals?pagination.limit=%d&pagination.offset=%d&proposal_status=2",
PaginationLimit,
offset,
)

var batchProposals responses.V1ProposalsRPCResponse
if errs := rpc.Get(url, &batchProposals); len(errs) > 0 {
return nil, &types.QueryError{
QueryError: nil,
NodeErrors: errs,
}
}

if batchProposals.Message != "" {
return nil, &types.QueryError{
QueryError: errors.New(batchProposals.Message),
}
}

parsedProposals := utils.Map(batchProposals.Proposals, func(p responses.V1Proposal) types.Proposal {
return p.ToProposal()
})
proposals = append(proposals, parsedProposals...)
if len(batchProposals.Proposals) < PaginationLimit {
break
}

offset += PaginationLimit
}

return proposals, nil
}

func (rpc *RPC) GetVote(proposal, voter string) (*types.Vote, *types.QueryError) {
url := fmt.Sprintf(
"/cosmos/gov/v1beta1/proposals/%s/votes/%s",
proposal,
voter,
)

var vote responses.VoteRPCResponse
if errs := rpc.Get(url, &vote); len(errs) > 0 {
return nil, &types.QueryError{
QueryError: nil,
NodeErrors: errs,
}
}

if vote.IsError() {
// not voted
if strings.Contains(vote.Message, "not found") {
return nil, nil
}

// some other errors
return nil, &types.QueryError{
QueryError: errors.New(vote.Message),
}
}

voteParsed, err := vote.ToVote()
if err != nil {
return nil, &types.QueryError{
QueryError: err,
NodeErrors: nil,
}
}

return voteParsed, nil
}

func (rpc *RPC) GetTally(proposal string) (*types.TallyRPCResponse, *types.QueryError) {
func (rpc *RPC) GetTally(proposal string) (*types.Tally, *types.QueryError) {
url := fmt.Sprintf(
"/cosmos/gov/v1beta1/proposals/%s/tally",
proposal,
)

var tally types.TallyRPCResponse
var tally responses.TallyRPCResponse
if errs := rpc.Get(url, &tally); len(errs) > 0 {
return nil, &types.QueryError{
QueryError: nil,
NodeErrors: errs,
}
}

return &tally, nil
return tally.Tally.ToTally(), nil
}

func (rpc *RPC) GetStakingPool() (*types.PoolRPCResponse, *types.QueryError) {
Expand Down
49 changes: 49 additions & 0 deletions pkg/fetchers/cosmos/proposals_v1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package cosmos

import (
"errors"
"fmt"
"main/pkg/fetchers/cosmos/responses"
"main/pkg/types"
"main/pkg/utils"
)

func (rpc *RPC) GetAllV1Proposals() ([]types.Proposal, *types.QueryError) {
proposals := []types.Proposal{}
offset := 0

for {
url := fmt.Sprintf(
// 2 is for PROPOSAL_STATUS_VOTING_PERIOD
"/cosmos/gov/v1/proposals?pagination.limit=%d&pagination.offset=%d&proposal_status=2",
PaginationLimit,
offset,
)

var batchProposals responses.V1ProposalsRPCResponse
if errs := rpc.Get(url, &batchProposals); len(errs) > 0 {
return nil, &types.QueryError{
QueryError: nil,
NodeErrors: errs,
}
}

if batchProposals.Message != "" {
return nil, &types.QueryError{
QueryError: errors.New(batchProposals.Message),
}
}

parsedProposals := utils.Map(batchProposals.Proposals, func(p responses.V1Proposal) types.Proposal {
return p.ToProposal()
})
proposals = append(proposals, parsedProposals...)
if len(batchProposals.Proposals) < PaginationLimit {
break
}

offset += PaginationLimit
}

return proposals, nil
}
Loading

0 comments on commit 776a5cc

Please sign in to comment.