Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

removed the request from the pool if the the proposal is bad #4572

Merged
merged 1 commit into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions integration/smartbft/smartbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -1809,6 +1810,115 @@ var _ = Describe("EndToEnd Smart BFT configuration test", func() {
By("invoking the chaincode")
invokeQuery(network, peer, network.Orderers[1], channel, 90)
})

It("send an update transaction to each orderer and deleting failed requests", func() {
channel := "testchannel1"
By("Create network")
networkConfig := nwo.MultiNodeSmartBFT()
networkConfig.Channels = nil
network = nwo.New(networkConfig, testDir, client, StartPort(), components)
network.GenerateConfigTree()
network.Bootstrap()
network.EventuallyTimeout *= 2

By("Start orderers")
var ordererRunners []*ginkgomon.Runner
for _, orderer := range network.Orderers {
runner := network.OrdererRunner(orderer,
"FABRIC_LOGGING_SPEC=orderer.common.cluster=debug:orderer.consensus.smartbft=debug:policies.ImplicitOrderer=debug",
"ORDERER_GENERAL_BACKOFF_MAXDELAY=20s")
ordererRunners = append(ordererRunners, runner)
proc := ifrit.Invoke(runner)
ordererProcesses = append(ordererProcesses, proc)
Eventually(proc.Ready(), network.EventuallyTimeout).Should(BeClosed())
}

By("Start peer")
peerRunner := network.PeerGroupRunner()
peerProcesses = ifrit.Invoke(peerRunner)
Eventually(peerProcesses.Ready(), network.EventuallyTimeout).Should(BeClosed())
peer := network.Peer("Org1", "peer0")

By("Join network to channel")
joinChannel(network, channel)

By("Waiting for followers to see the leader")
Eventually(ordererRunners[1].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Message from 1"))
Eventually(ordererRunners[2].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Message from 1"))
Eventually(ordererRunners[3].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Message from 1"))

By("Joining peers to channel")
orderer1 := network.Orderers[0]
network.JoinChannel(channel, orderer1, network.PeersWithChannel(channel)...)

By("Deploying chaincode")
deployChaincode(network, channel, testDir)

By("querying the chaincode")
sess, err := network.PeerUserSession(peer, "User1", commands.ChaincodeQuery{
ChannelID: channel,
Name: "mycc",
Ctor: `{"Args":["query","a"]}`,
})
Expect(err).NotTo(HaveOccurred())
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit(0))
Expect(sess).To(gbytes.Say("100"))

By("invoking the chaincode")
invokeQuery(network, peer, network.Orderers[1], channel, 90)

By("Changing the channel config, e.g. maximum number of bytes in a 1 MB batch")
newAbsoluteMaxBytes := 1_000_000
config := nwo.GetConfig(network, peer, orderer1, channel)
updatedConfig := proto.Clone(config).(*common.Config)
batchSizeConfigValue := updatedConfig.ChannelGroup.Groups["Orderer"].Values["BatchSize"]
batchSizeValue := &ordererProtos.BatchSize{}
Expect(proto.Unmarshal(batchSizeConfigValue.Value, batchSizeValue)).To(Succeed())
batchSizeValue.AbsoluteMaxBytes = uint32(newAbsoluteMaxBytes)
updatedConfig.ChannelGroup.Groups["Orderer"].Values["BatchSize"] = &common.ConfigValue{
ModPolicy: "Admins",
Value: protoutil.MarshalOrPanic(batchSizeValue),
}

tempDir, err := os.MkdirTemp(network.RootDir, "updateConfig")
Expect(err).NotTo(HaveOccurred())
updateFile := filepath.Join(tempDir, "update.pb")
defer os.RemoveAll(tempDir)

nwo.ComputeUpdateOrdererConfig(updateFile, network, channel, config, updatedConfig, peer, orderer1)

fileData, err := os.ReadFile(updateFile)
Expect(err).NotTo(HaveOccurred())

ctxEnv, err := protoutil.UnmarshalEnvelope(fileData)
Expect(err).NotTo(HaveOccurred())

By("Begin broadcast")
var wg sync.WaitGroup
for i := range network.Orderers {
wg.Add(1)
go func(ccid int) {
defer wg.Done()
resp, err := ordererclient.Broadcast(network, network.Orderers[ccid], ctxEnv)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))
}(i)
}
wg.Wait()
By("End broadcast")

// move the logger cursor to the beginning of the broadcast
Eventually(ordererRunners[0].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("verifyConfigUpdateMsg"))
Eventually(ordererRunners[1].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("verifyConfigUpdateMsg"))
Eventually(ordererRunners[2].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("verifyConfigUpdateMsg"))
Eventually(ordererRunners[3].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("verifyConfigUpdateMsg"))

// the channel update transaction will have to be deleted from each orderer
Eventually(ordererRunners[0].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("deleteRequest"))
Eventually(ordererRunners[1].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("deleteRequest"))
Eventually(ordererRunners[2].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("deleteRequest"))
Eventually(ordererRunners[3].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("deleteRequest"))
})
})
})

Expand Down
8 changes: 8 additions & 0 deletions orderer/consensus/smartbft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,13 @@ func (c *BFTChain) pruneCommittedRequests(block *cb.Block) {
wg.Wait()
}

func (c *BFTChain) pruneBadRequests() {
c.consensus.Pool.Prune(func(req []byte) error {
_, err := c.consensus.Verifier.VerifyRequest(req)
return err
})
}

func (c *BFTChain) submit(env *cb.Envelope, configSeq uint64) error {
reqBytes, err := proto.Marshal(env)
if err != nil {
Expand Down Expand Up @@ -566,6 +573,7 @@ func (c *BFTChain) updateRuntimeConfig(block *cb.Block) types.Reconfig {
if protoutil.IsConfigBlock(block) {
c.Comm.Configure(c.Channel, newRTC.RemoteNodes)
c.clusterService.ConfigureNodeCerts(c.Channel, newRTC.consenters)
c.pruneBadRequests()
}

membershipDidNotChange := reflect.DeepEqual(newRTC.Nodes, prevRTC.Nodes)
Expand Down