Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support user defined user event coalescer #763

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions serf/coalesce.go
Original file line number Diff line number Diff line change
@@ -7,10 +7,10 @@ import (
"time"
)

// coalescer is a simple interface that must be implemented to be
// Coalescer is a simple interface that must be implemented to be
// used inside of a coalesceLoop
type coalescer interface {
// Can the coalescer handle this event, if not it is
type Coalescer interface {
// Can the Coalescer handle this event, if not it is
// directly passed through to the destination channel
Handle(Event) bool

@@ -22,9 +22,9 @@ type coalescer interface {
}

// coalescedEventCh returns an event channel where the events are coalesced
// using the given coalescer.
// using the given Coalescer.
func coalescedEventCh(outCh chan<- Event, shutdownCh <-chan struct{},
cPeriod time.Duration, qPeriod time.Duration, c coalescer) chan<- Event {
cPeriod time.Duration, qPeriod time.Duration, c Coalescer) chan<- Event {
inCh := make(chan Event, 1024)
go coalesceLoop(inCh, outCh, shutdownCh, cPeriod, qPeriod, c)
return inCh
@@ -33,7 +33,7 @@ func coalescedEventCh(outCh chan<- Event, shutdownCh <-chan struct{},
// coalesceLoop is a simple long-running routine that manages the high-level
// flow of coalescing based on quiescence and a maximum quantum period.
func coalesceLoop(inCh <-chan Event, outCh chan<- Event, shutdownCh <-chan struct{},
coalescePeriod time.Duration, quiescentPeriod time.Duration, c coalescer) {
coalescePeriod time.Duration, quiescentPeriod time.Duration, c Coalescer) {
var quiescent <-chan time.Time
var quantum <-chan time.Time
shutdown := false
3 changes: 3 additions & 0 deletions serf/config.go
Original file line number Diff line number Diff line change
@@ -95,6 +95,9 @@ type Config struct {
UserCoalescePeriod time.Duration
UserQuiescentPeriod time.Duration

// Optional user defined Coalescer for user events.
UserEventCoalescer Coalescer

// The settings below relate to Serf keeping track of recently
// failed/left nodes and attempting reconnects.
//
7 changes: 5 additions & 2 deletions serf/serf.go
Original file line number Diff line number Diff line change
@@ -303,8 +303,11 @@ func Create(conf *Config) (*Serf, error) {

// Check if user event coalescing is enabled
if conf.UserCoalescePeriod > 0 && conf.UserQuiescentPeriod > 0 && conf.EventCh != nil {
c := &userEventCoalescer{
events: make(map[string]*latestUserEvents),
c := conf.UserEventCoalescer
if c == nil {
c = &userEventCoalescer{
events: make(map[string]*latestUserEvents),
}
}

conf.EventCh = coalescedEventCh(conf.EventCh, serf.shutdownCh,