Skip to content

Commit

Permalink
feat(event/handler): add (*Handler).EventHandler() method
Browse files Browse the repository at this point in the history
  • Loading branch information
bounoable committed Nov 17, 2022
1 parent 956cc24 commit 62ee30b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
1 change: 1 addition & 0 deletions backend/mongo/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func NoIndex(ni bool) EventStoreOption {

// WithIndices returns an EventStoreOption that creates additional indices for
// the event collection. Can be used to create builtin edge-case indices:
//
// WithIndices(indices.EventStore.NameAndVersion)
func WithIndices(models ...mongo.IndexModel) EventStoreOption {
return func(s *EventStore) {
Expand Down
26 changes: 18 additions & 8 deletions event/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ var ErrRunning = errors.New("event handler is already running")
//
// errs, err := h.Run(context.TODO())
type Handler struct {
bus event.Bus
store event.Store // optional
bus event.Bus
store event.Store // optional

mux sync.RWMutex
handlers map[string]func(event.Event)
eventNames map[string]struct{}

mux sync.RWMutex
ctx context.Context
ctx context.Context
}

// Option is a handler option.
Expand Down Expand Up @@ -59,13 +59,23 @@ func New(bus event.Bus, opts ...Option) *Handler {
}

// RegisterEventHandler registers the handler for the given event.
// Events must be registered before h.Run() is called. events that are
// Events must be registered before h.Run() is called. Events that are
// registered after h.Run() has been called, won't be handled.
func (h *Handler) RegisterEventHandler(name string, fn func(event.Event)) {
h.mux.Lock()
defer h.mux.Unlock()
h.handlers[name] = fn
h.eventNames[name] = struct{}{}
}

// EventHandler returns the event handler for the given event name.
func (h *Handler) EventHandler(name string) (func(event.Event), bool) {
h.mux.RLock()
defer h.mux.RUnlock()
fn, ok := h.handlers[name]
return fn, ok
}

// Context returns the context that was passed to h.Run(). If h.Run() has not
// been called yet, nil is returned.
func (h *Handler) Context() context.Context {
Expand Down Expand Up @@ -106,7 +116,7 @@ func (h *Handler) Run(ctx context.Context) (<-chan error, error) {

go func() {
if err := streams.Walk(ctx, func(evt event.Event) error {
if fn, ok := h.handlers[evt.Name()]; ok {
if fn, ok := h.EventHandler(evt.Name()); ok {
fn(evt)
}
return nil
Expand Down Expand Up @@ -136,7 +146,7 @@ func (h *Handler) handleStoredEvents(ctx context.Context, eventNames []string) e
}

return streams.Walk(ctx, func(evt event.Event) error {
if fn, ok := h.handlers[evt.Name()]; ok {
if fn, ok := h.EventHandler(evt.Name()); ok {
fn(evt)
}
return nil
Expand Down

0 comments on commit 62ee30b

Please sign in to comment.