diff --git a/event_log.go b/event_log.go index aa17dad..1394ac9 100644 --- a/event_log.go +++ b/event_log.go @@ -5,39 +5,66 @@ package sse import ( + "container/list" "strconv" "time" ) -// EventLog holds all of previous events -type EventLog []*Event +// Events holds all of previous events +type Events []*Event -// Add event to eventlog +// EventLog holds the log of all of previous events +type EventLog struct { + MaxEntries int + log *list.List +} + +func newEventLog(maxEntries int) *EventLog { + return &EventLog{ + MaxEntries: maxEntries, + log: list.New(), + } +} + +// Add event to log func (e *EventLog) Add(ev *Event) { if !ev.hasContent() { return } - ev.ID = []byte(e.currentindex()) + ev.ID = []byte(e.currentIndex()) ev.timestamp = time.Now() - *e = append(*e, ev) + + // if MaxEntries is set to greater than 0 (no limit) check entries + if e.MaxEntries > 0 { + // if we are at max entries limit + // then remove the item at the back + if e.log.Len() >= e.MaxEntries { + e.log.Remove(e.log.Back()) + } + } + e.log.PushFront(ev) } -// Clear events from eventlog +// Clear events from log func (e *EventLog) Clear() { - *e = nil + e.log.Init() } // Replay events to a subscriber func (e *EventLog) Replay(s *Subscriber) { - for i := 0; i < len(*e); i++ { - id, _ := strconv.Atoi(string((*e)[i].ID)) + for l := e.log.Back(); l != nil; l = l.Prev() { + id, _ := strconv.Atoi(string(l.Value.(*Event).ID)) if id >= s.eventid { - s.connection <- (*e)[i] + s.connection <- l.Value.(*Event) } } } -func (e *EventLog) currentindex() string { - return strconv.Itoa(len(*e)) +func (e *EventLog) currentIndex() string { + return strconv.Itoa(e.log.Len()) +} + +func (e *EventLog) Len() int { + return e.log.Len() } diff --git a/event_log_test.go b/event_log_test.go index 66cfaf2..bc78244 100644 --- a/event_log_test.go +++ b/event_log_test.go @@ -11,16 +11,27 @@ import ( ) func TestEventLog(t *testing.T) { - ev := make(EventLog, 0) + ev := newEventLog(0) testEvent := &Event{Data: []byte("test")} ev.Add(testEvent) ev.Clear() - assert.Equal(t, 0, len(ev)) + assert.Equal(t, 0, ev.Len()) ev.Add(testEvent) ev.Add(testEvent) - assert.Equal(t, 2, len(ev)) + assert.Equal(t, 2, ev.Len()) +} + +func TestEventLogMaxEntries(t *testing.T) { + ev := newEventLog(2) + testEvent := &Event{Data: []byte("test")} + + ev.Add(testEvent) + ev.Add(testEvent) + ev.Add(testEvent) + + assert.Equal(t, 2, ev.Len()) } diff --git a/go.mod b/go.mod index aa376f2..a648d02 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,15 @@ module github.com/r3labs/sse/v2 -go 1.13 +go 1.20 require ( github.com/stretchr/testify v1.7.0 - golang.org/x/net v0.0.0-20191116160921-f9c825593386 // indirect gopkg.in/cenkalti/backoff.v1 v1.1.0 ) + +require ( + github.com/davecgh/go-spew v1.1.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/net v0.0.0-20210610124326-52da8fb2a613 // indirect + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect +) diff --git a/go.sum b/go.sum index 31187d6..2042974 100644 --- a/go.sum +++ b/go.sum @@ -5,11 +5,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/net v0.0.0-20191116160921-f9c825593386 h1:ktbWvQrW08Txdxno1PiDpSxPXG6ndGsfnJjRRtkM0LQ= -golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/net v0.0.0-20210610124326-52da8fb2a613 h1:SqvqnUCcwFhyyRueFOEFTBaWeXYwK+CL/767809IlbQ= +golang.org/x/net v0.0.0-20210610124326-52da8fb2a613/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/server.go b/server.go index d1b27af..bf8efef 100644 --- a/server.go +++ b/server.go @@ -82,7 +82,24 @@ func (s *Server) CreateStream(id string) *Stream { return s.streams[id] } - str := newStream(id, s.BufferSize, s.AutoReplay, s.AutoStream, s.OnSubscribe, s.OnUnsubscribe) + str := newStream(id, s.BufferSize, 0, s.AutoReplay, s.AutoStream, s.OnSubscribe, s.OnUnsubscribe) + str.run() + + s.streams[id] = str + + return str +} + +// CreateStreamWithOpts will create a new stream with options and register it +func (s *Server) CreateStreamWithOpts(id string, opts StreamOpts) *Stream { + s.muStreams.Lock() + defer s.muStreams.Unlock() + + if s.streams[id] != nil { + return s.streams[id] + } + + str := newStream(id, s.BufferSize, opts.MaxEntries, opts.AutoReplay, opts.AutoStream, s.OnSubscribe, s.OnUnsubscribe) str.run() s.streams[id] = str diff --git a/stream.go b/stream.go index bfbcb9b..c7f3640 100644 --- a/stream.go +++ b/stream.go @@ -19,7 +19,7 @@ type Stream struct { register chan *Subscriber deregister chan *Subscriber subscribers []*Subscriber - Eventlog EventLog + Eventlog *EventLog subscriberCount int32 // Enables replaying of eventlog to newly added subscribers AutoReplay bool @@ -30,8 +30,17 @@ type Stream struct { OnUnsubscribe func(streamID string, sub *Subscriber) } +type StreamOpts struct { + // Max amount of log entries per stream + MaxEntries int + // Enables creation of a stream when a client connects + AutoStream bool + // Enables automatic replay for each new subscriber that connects + AutoReplay bool +} + // newStream returns a new stream -func newStream(id string, buffSize int, replay, isAutoStream bool, onSubscribe, onUnsubscribe func(string, *Subscriber)) *Stream { +func newStream(id string, buffSize, maxEntries int, replay, isAutoStream bool, onSubscribe, onUnsubscribe func(string, *Subscriber)) *Stream { return &Stream{ ID: id, AutoReplay: replay, @@ -41,7 +50,7 @@ func newStream(id string, buffSize int, replay, isAutoStream bool, onSubscribe, deregister: make(chan *Subscriber), event: make(chan *Event, buffSize), quit: make(chan struct{}), - Eventlog: make(EventLog, 0), + Eventlog: newEventLog(maxEntries), OnSubscribe: onSubscribe, OnUnsubscribe: onUnsubscribe, } diff --git a/stream_test.go b/stream_test.go index 1c89a6e..90366c0 100644 --- a/stream_test.go +++ b/stream_test.go @@ -16,7 +16,7 @@ import ( // Maybe fix this in the future so we can test with -race enabled func TestStreamAddSubscriber(t *testing.T) { - s := newStream("test", 1024, true, false, nil, nil) + s := newStream("test", 1024, 0, true, false, nil, nil) s.run() defer s.close() @@ -34,7 +34,7 @@ func TestStreamAddSubscriber(t *testing.T) { } func TestStreamRemoveSubscriber(t *testing.T) { - s := newStream("test", 1024, true, false, nil, nil) + s := newStream("test", 1024, 0, true, false, nil, nil) s.run() defer s.close() @@ -47,7 +47,7 @@ func TestStreamRemoveSubscriber(t *testing.T) { } func TestStreamSubscriberClose(t *testing.T) { - s := newStream("test", 1024, true, false, nil, nil) + s := newStream("test", 1024, 0, true, false, nil, nil) s.run() defer s.close() @@ -59,7 +59,7 @@ func TestStreamSubscriberClose(t *testing.T) { } func TestStreamDisableAutoReplay(t *testing.T) { - s := newStream("test", 1024, true, false, nil, nil) + s := newStream("test", 1024, 0, true, false, nil, nil) s.run() defer s.close() @@ -74,7 +74,7 @@ func TestStreamDisableAutoReplay(t *testing.T) { func TestStreamMultipleSubscribers(t *testing.T) { var subs []*Subscriber - s := newStream("test", 1024, true, false, nil, nil) + s := newStream("test", 1024, 0, true, false, nil, nil) s.run() for i := 0; i < 10; i++ {