Skip to content

Commit

Permalink
add callback support when leader is [de]selected
Browse files Browse the repository at this point in the history
  • Loading branch information
flowerinthenight committed Nov 22, 2024
1 parent 4a76b9c commit cb68ea7
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
9 changes: 8 additions & 1 deletion examples/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,14 @@ func main() {

defer db.Close()
quit, cancel := context.WithCancel(ctx)
lock := spindle.New(db, *table, *name, spindle.WithDuration(10000))
lock := spindle.New(db,
*table,
*name,
spindle.WithDuration(10000),
spindle.WithLeaderCallback(nil, func(d interface{}, m []byte) {
log.Println("callback:", string(m))
}),
)

done := make(chan error, 1)
lock.Run(quit, done) // start main loop
Expand Down
30 changes: 30 additions & 0 deletions spindle.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var (
ErrNotRunning = fmt.Errorf("spindle: not running")
)

type FnLeaderCallback func(data interface{}, msg []byte)

type Option interface {
Apply(*Lock)
}
Expand All @@ -39,6 +41,23 @@ func (w withDuration) Apply(o *Lock) { o.duration = int64(w) }
// WithDuration sets the locker's lease duration in ms. Minimum is 1000ms.
func WithDuration(v int64) Option { return withDuration(v) }

type withLeaderCallback struct {
d interface{}
h FnLeaderCallback
}

func (w withLeaderCallback) Apply(o *Lock) {
o.cbLeaderData = w.d
o.cbLeader = w.h
}

// WithLeaderCallback sets the node's callback function when it a
// leader is selected (or deselected). The msg arg for h will be
// set to either 0 or 1.
func WithLeaderCallback(d interface{}, h FnLeaderCallback) Option {
return withLeaderCallback{d, h}
}

type withLogger struct{ l *log.Logger }

func (w withLogger) Apply(o *Lock) { o.logger = w.l }
Expand All @@ -57,6 +76,9 @@ type Lock struct {
mtx *sync.Mutex
logger *log.Logger
active atomic.Int32

cbLeader FnLeaderCallback // leader callback
cbLeaderData interface{} // arbitrary data passed to fnLeader
}

// Run starts the main lock loop which can be canceled using the input context. You can
Expand Down Expand Up @@ -92,6 +114,12 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error {
}
}()

leaderCallback := func(state int) {
if l.cbLeader != nil {
l.cbLeader(l.cbLeaderData, []byte(fmt.Sprintf("%d", state)))
}
}

locked := func() bool {
// See if there is an active leased lock (could be us, could be someone else).
token, diff, err := l.checkLock()
Expand All @@ -106,6 +134,7 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error {
l.heartbeat() // only on 1
}

leaderCallback(1)
l.logger.Println("leader active (me)")
return true
}
Expand All @@ -123,6 +152,7 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error {
}

if ok {
leaderCallback(0)
l.logger.Println("leader active (not me)")
leader.Store(0) // reset heartbeat
return true
Expand Down

0 comments on commit cb68ea7

Please sign in to comment.