Skip to content
This repository was archived by the owner on Apr 4, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion http.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (rn *Node) peerMembersHandlerFunc() func(http.ResponseWriter, *http.Request
}

func (rn *Node) handlePeerMembersRequest(w http.ResponseWriter, req *http.Request) {
if !rn.initialized {
if !rn.initialized.IsSet() {
rn.writeNodeNotReady(w)
} else {
membersResp := &cTypes.ConfigMembershipResponseData{
Expand Down
38 changes: 19 additions & 19 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ type Node struct {

raftConfig *raft.Config

started bool
initialized bool
running bool
started AtomicBool
initialized AtomicBool
running AtomicBool

proposeC chan string
fsm FSM
Expand Down Expand Up @@ -210,7 +210,7 @@ func (rn *Node) Start() error {

walEnabled := rn.walDir() != ""
rejoinCluster := rn.shouldRejoinCluster()
if rn.started {
if rn.started.IsSet() {
return nil
}

Expand Down Expand Up @@ -261,7 +261,7 @@ func (rn *Node) Start() error {
}
rn.logger.Debug("Successfully advanced election ticks")

rn.initialized = true
rn.initialized.Set()

go func(rn *Node) {
rn.logger.Info("Scanning for new raft logs")
Expand Down Expand Up @@ -313,7 +313,7 @@ func (rn *Node) Start() error {
rn.logger.Fatalf("%+v", err)
}
}(rn)
rn.started = true
rn.started.Set()

// TODO: add case for when no peers or bootstrap specified it waits to get added.
if rejoinCluster {
Expand All @@ -329,13 +329,13 @@ func (rn *Node) Start() error {
}

// final step to mark node as initialized
rn.running = true
rn.running.Set()
return nil
}

// IsRunning reports if the raft node is running
func (rn *Node) IsRunning() bool {
return rn.running
return rn.running.IsSet()
}

// Stop will stop the raft node.
Expand All @@ -348,12 +348,12 @@ func (rn *Node) Stop() error {
rn.logger.Debug("Stopping raft transporter")
rn.transport.Stop()
// TODO: Don't poll stuff here
for rn.running {
for rn.running.IsSet() {
time.Sleep(200 * time.Millisecond)
}
rn.logger.Info("Canoe has stopped")
rn.started = false
rn.initialized = false
rn.started.Unset()
rn.initialized.Unset()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you rely on the ordering of these atomic bool changes? Because ordering like this on two different memory locations is not guaranteed.

Golang's memory model in the spec says:

That is, compilers and processors may reorder the reads and writes executed within a single goroutine only when the reordering does not change the behavior within that goroutine as defined by the language specification. Because of this reordering, the execution order observed by one goroutine may differ from the order perceived by another.

So you can't rely on the order these are set being the same between two separate goroutines running the same code potentially.

This may well not matter, but if it breaks assumptions to re-order these randomly then it's still racey.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, no, the orderings don't matter. Just that they are both unset

return nil
}

Expand All @@ -369,12 +369,12 @@ func (rn *Node) Destroy() error {
}
rn.logger.Debug("Successfully removed self from canoe cluster")

if rn.running {
if rn.running.IsSet() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If two goroutines can be running this it's still racey since they might both observe it being set and run this section.

Even if you are sure this can only be run by a single goroutine, it's still potentially racey because other goroutines might still get a response to node.isHealthy() of true even after the stop chan is closed and transport is partially shutdown. That might not cause actual problems in practice but it probably violates assumptions about what is safe to assume and what isn't since none are documented in the methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm. Yeah you're right. I'll look back over and see about using mutexes possibly - or some other construct. Or a check and set possibly

close(rn.stopc)
rn.logger.Debug("Stopping raft transport layer")
rn.transport.Stop()
// TODO: Have a stopped chan for triggering this action
for rn.running {
for rn.running.IsSet() {
time.Sleep(200 * time.Millisecond)
}
}
Expand All @@ -383,8 +383,8 @@ func (rn *Node) Destroy() error {
rn.deletePersistentData()
rn.logger.Debug("Successfully deleted persistent data")

rn.started = false
rn.initialized = false
rn.started.Unset()
rn.initialized.Unset()
return nil
}

Expand Down Expand Up @@ -469,7 +469,7 @@ func nonInitNode(args *NodeConfig) (*Node, error) {
raftPort: args.RaftPort,
configPort: args.ConfigurationPort,
fsm: args.FSM,
initialized: false,
initialized: 0,
observers: make(map[uint64]*Observer),
peerMap: make(map[uint64]cTypes.Peer),
initBackoffArgs: args.InitBackoff,
Expand Down Expand Up @@ -633,12 +633,12 @@ func (rn *Node) proposePeerDeletion(delReq *raftpb.ConfChange, async bool) error
}

func (rn *Node) canAlterPeer() bool {
return rn.isHealthy() && rn.initialized
return rn.isHealthy() && rn.initialized.IsSet()
}

// TODO: Define healthy better
func (rn *Node) isHealthy() bool {
return rn.running
return rn.running.IsSet()
}

func (rn *Node) scanReady() error {
Expand All @@ -649,7 +649,7 @@ func (rn *Node) scanReady() error {
}
}()
defer func(rn *Node) {
rn.running = false
rn.running.Unset()
}(rn)

var snapTicker *time.Ticker
Expand Down
15 changes: 15 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,24 @@ package canoe
import (
"encoding/binary"
"github.com/satori/go.uuid"
"sync/atomic"
)

// Uint64UUID returns a UUID encoded to uint64
func Uint64UUID() uint64 {
return binary.LittleEndian.Uint64(uuid.NewV4().Bytes())
}

type AtomicBool int32

func (a *AtomicBool) Set() {
atomic.StoreInt32((*int32)(a), 1)
}

func (a *AtomicBool) Unset() {
atomic.StoreInt32((*int32)(a), 0)
}

func (a *AtomicBool) IsSet() bool {
return atomic.LoadInt32((*int32)(a)) == 1
}